Re: Broadcast state and OutOfMemoryError: Direct buffer memory

2022-10-21 Thread yanfei lei
Hi Dan,
Usually broadcast state needs more network buffers, the network buffer used
to exchange data records between tasks would request a portion of direct
memory[1],  I think it is possible to get the “Direct buffer memory” OOM
errors in this scenarios. Maybe you can try to increase
taskmanager.memory.framework.off-heap.size

 and taskmanager.memory.task.off-heap.size

.

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/memory/mem_setup_tm/#detailed-memory-model

Best,
Yanfei

Dan Hill  于2022年10月21日周五 15:39写道:

> Hi.  My team recently added broadcast state to our Flink jobs.  We've
> started hitting this OOM "Direct buffer memory" error.  Is this a common
> problem with broadcast state?  Or is it likely a different problem?
> Thanks! - Dan
>


Difference between DataStream.broadcast() vs DataStream.broadcast(MapStateDescriptor)

2022-10-21 Thread Qing Lim
Hi all, I am trying to figure out how Datastream.broadcast() and 
DataStream.broadcast(MapStateDescriptor) differ.

My use case:
I have 2 streams:
Stream 1 contains updates, which collectively build up a state
Stream 2 is keyed and every parallel instance need to connect with EVERY update 
from Stream 1.

I am thinking I can probably achieve this by doing

Stream1.broadcast().connect(stream2).process(myFun)

I am failing to understand when would I need to use Broadcast State pattern, is 
it a convience method built on top of broadcast() or is it something very 
different?

The best info I've found is from this SO: 
https://stackoverflow.com/questions/50570605/why-broadcast-state-can-store-the-dynamic-rules-however-broadcast-operator-c
Which seems to suggest Broadcast State broadcast() then maintain state in each 
parallel operator under the hood?

Kind regards.

Qing Lim | Marshall Wace LLP, George House, 131 Sloane Street, London | E-mail: 
q@mwam.com | Tel: +44 207 925 4865


This e-mail and any attachments are confidential to the addressee(s) and may 
contain information that is legally privileged and/or confidential. If you are 
not the intended recipient of this e-mail you are hereby notified that any 
dissemination, distribution, or copying of its content is strictly prohibited. 
If you have received this message in error, please notify the sender by return 
e-mail and destroy the message and all copies in your possession.

To find out more details about how we may collect, use and share your personal 
information, please see https://www.mwam.com/privacy-policy. This includes 
details of how calls you make to us may be recorded in order for us to comply 
with our legal and regulatory obligations.

To the extent that the contents of this email constitutes a financial 
promotion, please note that it is issued only to and/or directed only at 
persons who are professional clients or eligible counterparties as defined in 
the FCA Rules. Any investment products or services described in this email are 
available only to professional clients and eligible counterparties. Persons who 
are not professional clients or eligible counterparties should not rely or act 
on the contents of this email.

Marshall Wace LLP is authorised and regulated by the Financial Conduct 
Authority. Marshall Wace LLP is a limited liability partnership registered in 
England and Wales with registered number OC302228 and registered office at 
George House, 131 Sloane Street, London, SW1X 9AT. If you are receiving this 
e-mail as a client, or an investor in an investment vehicle, managed or advised 
by Marshall Wace North America L.P., the sender of this e-mail is communicating 
with you in the sender's capacity as an associated or related person of 
Marshall Wace North America L.P. (“MWNA”), which is registered with the US 
Securities and Exchange Commission (“SEC”) as an investment adviser.  
Registration with the SEC does not imply that MWNA or its employees possess a 
certain level of skill or training.


Re: Modify savepoints in Flink

2022-10-21 Thread Sriram Ganesh
Thanks, I'll check it out.

On Fri, Oct 21, 2022, 18:20 Piotr Nowojski  wrote:

> Hi,
>
> Yes and no. StateProcessor API can read any Flink state, but you have to
> describe the state you want it to access. Take a look at the example in the
> docs [1].
>
> First you have an example of a theoretical production function
> `StatefulFunctionWithTime`, which state you want to modify. Note the
> `ValueState` and `ListState` fields and their descriptors. That's the state
> of that particular function. Descriptors determine how the state is
> serialised. Usually they are pretty simple.
> Below is the `ReaderFunction`, that you want to use to access/modify the
> state via the StateProcessor API. To do so, you have to specify the state
> you want to access and effectively mimic/copy paste the state descriptors
> from the production code.
>
> If you want to modify the state of a source/sink function, you would have
> to first take a look into the source code of such a connector to know what
> to modify and copy its descriptors. Also note that for source/sink the
> state is most likely non-keyed.
>
> Best,
> Piotrek
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/libs/state_processor_api/#keyed-state
>
> pt., 21 paź 2022 o 14:37 Sriram Ganesh  napisał(a):
>
>> I have question on this. Different connector can have different
>> serialisation and de-serlisation technique right?. Wouldn't that impact?.
>> If I use StateProcessor API, would that be agnostic to all the sources and
>> sinks?.
>>
>> On Fri, Oct 21, 2022, 18:00 Piotr Nowojski  wrote:
>>
>>> ops
>>>
>>> > Alternatively, you can modify a code of your function/operator for
>>> which you want to modify the state. For example in the
>>> `org.apache.flink.streaming.api.checkpoint.CheckpointedFunction#initializeState`
>>> method you could add some code that would do a migration of your old state
>>> to a new one.
>>> > And you can drop such code later, in the next savepoint.
>>>
>>> That was not entirely true. This would work for the non-keyed state. For
>>> the keyed state there is no easy alternative (you would have to iterate
>>> through all of the keys, which I think is not exposed via Public API) -
>>> best to use StateProcessor API.
>>>
>>> Best,
>>> Piotrek
>>>
>>> pt., 21 paź 2022 o 10:54 Sriram Ganesh  napisał(a):
>>>
 Thanks !. Will try this.

 On Fri, Oct 21, 2022 at 2:22 PM Piotr Nowojski 
 wrote:

> Hi Sriram,
>
> You can read and modify savepoints using StateProcessor API [1].
>
> Alternatively, you can modify a code of your function/operator for
> which you want to modify the state. For example in the
> `org.apache.flink.streaming.api.checkpoint.CheckpointedFunction#initializeState`
> method you could add some code that would do a migration of your old state
> to a new one.
>
> ```
> private transient ValueState old;
> private transient ValueState new;
> (...)
> initializeState(...) {
>   (...)
>   if (new.value() == null && old.value() != null) {
> // code to migrate from old to new one
> new.update(migrate(old.value());
> old.update(null);
>   }
> }
> ```
>
> And you can drop such code later, in the next savepoint.
>
> Best,
> Piotrek
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/libs/state_processor_api/
>
> pt., 21 paź 2022 o 10:05 Sriram Ganesh 
> napisał(a):
>
>> Hi All,
>>
>> I am working on a scenario where I need to modify the existing
>> savepoint operator state. Ex: Wanted to remove some offset of the
>> savepoint.
>>
>> What is the better practice for these scenarios?. Could you please
>> help me with any example as such?
>>
>> Thanks in advance.
>>
>> --
>> *Sriram G*
>> *Tech*
>>
>>

 --
 *Sriram G*
 *Tech*




Re: Cannot run pyflink example using Flink CLI

2022-10-21 Thread Levan Huyen
Great, thanks!

Kind regards,
Levan Huyen

On Fri, 21 Oct 2022 at 00:53, Biao Geng  wrote:

> You are right.
> It contains the python package `pyflink` and some dependencies like py4j
> and cloudpickle but does not contain all relevant dependencies(e.g.
> `google.protobuf` as the error log shows, which I also reproduce in my own
> machine).
>
> Best,
> Biao Geng
>
> Levan Huyen  于2022年10月20日周四 19:53写道:
>
>> Thanks Biao.
>>
>> May I ask one more question: does the binary package on Apache site (e.g:
>> https://archive.apache.org/dist/flink/flink-1.15.2) contain the python
>> package `pyflink` and its dependencies? I guess the answer is no.
>>
>> Thanks and regards,
>> Levan Huyen
>>
>> On Thu, 20 Oct 2022 at 18:13, Biao Geng  wrote:
>>
>>> Hi Levan,
>>> Great to hear that your issue is resolved!
>>> For the follow-up question, I am not quite familiar with AWS EMR's
>>> configuration for flink but due to the error you attached, it looks like
>>> that pyflink may not ship some 'Google' dependencies in the Flink binary
>>> zip file and as a result, it will try to find it in your python
>>> environment. cc @hxbks...@gmail.com
>>> For now, to manage the complex python dependencies, the typical usage of
>>> pyflink in multiple node clusters for production is to create your venv and
>>> use it in your `flink run` command or in the python code. You can refer to
>>> this doc
>>> 
>>> for details.
>>>
>>> Best,
>>> Biao Geng
>>>
>>> Levan Huyen  于2022年10月20日周四 14:11写道:
>>>
 Hi Biao,

 Thanks for your help. That solved my issue. It turned out that in
 setup1 (in EMR), I got apache-flink installed, but the package (and its
 dependencies) are not in the directory `/usr/lib/python3.7/site-packages`
 (corresponding to the python binary in `/usr/bin/python3`). For some
 reason, the packages are in the current user's location (`~/.local/...)
 which Flink did not look at.

 BTW, is there any way to use the pyflink shipped with the Flink binary
 zip file that I downloaded from Apache's site? On EMR, such package is
 included, and I feel it's awkward to have to install another version using
 `pip install`. It will also be confusing about where to add the
 dependencies jars.

 Thanks and regards,
 Levan Huyen


 On Thu, 20 Oct 2022 at 02:25, Biao Geng  wrote:

> Hi Levan,
>
> For your setup1 & 2, it looks like the python environment is not
> ready. Have you tried python -m pip install apache-flink for the
> first 2 setups?
> For your setup3, as you are trying to use `flink run ...` command, it
> will try to connect to a launched flink cluster but I guess you did not
> launch the flink cluster. You can do `start-cluster.sh` first to launch a
> standalone flink cluster and then try the `flink run ...` command.
> For your setup4, the reason why it works well is that it will use the
> default mini cluster to run the pyflink job. So even you haven't started a
> standalone cluster, it can work as well.
>
> Best,
> Biao Geng
>
> Levan Huyen  于2022年10月19日周三 17:07写道:
>
>> Hi,
>>
>> I'm new to PyFlink, and I couldn't run a basic example that shipped
>> with Flink.
>> This is the command I tried:
>>
>> ./bin/flink run -py examples/python/datastream/word_count.py
>>
>> Here below are the results I got with different setups:
>>
>> 1. On AWS EMR 6.8.0 (Flink 1.15.1):
>> *Error: No module named 'google'*I tried with the Flink shipped with
>> EMR, or the binary v1.15.1/v1.15.2 downloaded from Flink site. I got that
>> same error message in all cases.
>>
>> Traceback (most recent call last):
>>
>>   File "/usr/lib64/python3.7/runpy.py", line 193, in
>> _run_module_as_main
>>
>> "__main__", mod_spec)
>>
>>   File "/usr/lib64/python3.7/runpy.py", line 85, in _run_code
>>
>> exec(code, run_globals)
>>
>>   File
>> "/tmp/pyflink/622f19cc-6616-45ba-b7cb-20a1462c4c3f/a29eb7a4-fff1-4d98-9b38-56b25f97b70a/word_count.py",
>> line 134, in 
>>
>> word_count(known_args.input, known_args.output)
>>
>>   File
>> "/tmp/pyflink/622f19cc-6616-45ba-b7cb-20a1462c4c3f/a29eb7a4-fff1-4d98-9b38-56b25f97b70a/word_count.py",
>> line 89, in word_count
>>
>> ds = ds.flat_map(split) \
>>
>>   File
>> "/usr/lib/flink/opt/python/pyflink.zip/pyflink/datastream/data_stream.py",
>> line 333, in flat_map
>>
>>   File
>> "/usr/lib/flink/opt/python/pyflink.zip/pyflink/datastream/data_stream.py",
>> line 557, in process
>>
>>   File
>> "/usr/lib/flink/opt/python/pyflink.zip/pyflink/fn_execution/flink_fn_execution_pb2.py",
>> line 23, in 
>>
>> ModuleNotFoundError: No module named 'google'
>>

Re: Modify savepoints in Flink

2022-10-21 Thread Piotr Nowojski
Hi,

Yes and no. StateProcessor API can read any Flink state, but you have to
describe the state you want it to access. Take a look at the example in the
docs [1].

First you have an example of a theoretical production function
`StatefulFunctionWithTime`, which state you want to modify. Note the
`ValueState` and `ListState` fields and their descriptors. That's the state
of that particular function. Descriptors determine how the state is
serialised. Usually they are pretty simple.
Below is the `ReaderFunction`, that you want to use to access/modify the
state via the StateProcessor API. To do so, you have to specify the state
you want to access and effectively mimic/copy paste the state descriptors
from the production code.

If you want to modify the state of a source/sink function, you would have
to first take a look into the source code of such a connector to know what
to modify and copy its descriptors. Also note that for source/sink the
state is most likely non-keyed.

Best,
Piotrek

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/libs/state_processor_api/#keyed-state

pt., 21 paź 2022 o 14:37 Sriram Ganesh  napisał(a):

> I have question on this. Different connector can have different
> serialisation and de-serlisation technique right?. Wouldn't that impact?.
> If I use StateProcessor API, would that be agnostic to all the sources and
> sinks?.
>
> On Fri, Oct 21, 2022, 18:00 Piotr Nowojski  wrote:
>
>> ops
>>
>> > Alternatively, you can modify a code of your function/operator for
>> which you want to modify the state. For example in the
>> `org.apache.flink.streaming.api.checkpoint.CheckpointedFunction#initializeState`
>> method you could add some code that would do a migration of your old state
>> to a new one.
>> > And you can drop such code later, in the next savepoint.
>>
>> That was not entirely true. This would work for the non-keyed state. For
>> the keyed state there is no easy alternative (you would have to iterate
>> through all of the keys, which I think is not exposed via Public API) -
>> best to use StateProcessor API.
>>
>> Best,
>> Piotrek
>>
>> pt., 21 paź 2022 o 10:54 Sriram Ganesh  napisał(a):
>>
>>> Thanks !. Will try this.
>>>
>>> On Fri, Oct 21, 2022 at 2:22 PM Piotr Nowojski 
>>> wrote:
>>>
 Hi Sriram,

 You can read and modify savepoints using StateProcessor API [1].

 Alternatively, you can modify a code of your function/operator for
 which you want to modify the state. For example in the
 `org.apache.flink.streaming.api.checkpoint.CheckpointedFunction#initializeState`
 method you could add some code that would do a migration of your old state
 to a new one.

 ```
 private transient ValueState old;
 private transient ValueState new;
 (...)
 initializeState(...) {
   (...)
   if (new.value() == null && old.value() != null) {
 // code to migrate from old to new one
 new.update(migrate(old.value());
 old.update(null);
   }
 }
 ```

 And you can drop such code later, in the next savepoint.

 Best,
 Piotrek

 [1]
 https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/libs/state_processor_api/

 pt., 21 paź 2022 o 10:05 Sriram Ganesh 
 napisał(a):

> Hi All,
>
> I am working on a scenario where I need to modify the existing
> savepoint operator state. Ex: Wanted to remove some offset of the
> savepoint.
>
> What is the better practice for these scenarios?. Could you please
> help me with any example as such?
>
> Thanks in advance.
>
> --
> *Sriram G*
> *Tech*
>
>
>>>
>>> --
>>> *Sriram G*
>>> *Tech*
>>>
>>>


Re: Modify savepoints in Flink

2022-10-21 Thread Sriram Ganesh
I have question on this. Different connector can have different
serialisation and de-serlisation technique right?. Wouldn't that impact?.
If I use StateProcessor API, would that be agnostic to all the sources and
sinks?.

On Fri, Oct 21, 2022, 18:00 Piotr Nowojski  wrote:

> ops
>
> > Alternatively, you can modify a code of your function/operator for which
> you want to modify the state. For example in the
> `org.apache.flink.streaming.api.checkpoint.CheckpointedFunction#initializeState`
> method you could add some code that would do a migration of your old state
> to a new one.
> > And you can drop such code later, in the next savepoint.
>
> That was not entirely true. This would work for the non-keyed state. For
> the keyed state there is no easy alternative (you would have to iterate
> through all of the keys, which I think is not exposed via Public API) -
> best to use StateProcessor API.
>
> Best,
> Piotrek
>
> pt., 21 paź 2022 o 10:54 Sriram Ganesh  napisał(a):
>
>> Thanks !. Will try this.
>>
>> On Fri, Oct 21, 2022 at 2:22 PM Piotr Nowojski 
>> wrote:
>>
>>> Hi Sriram,
>>>
>>> You can read and modify savepoints using StateProcessor API [1].
>>>
>>> Alternatively, you can modify a code of your function/operator for which
>>> you want to modify the state. For example in the
>>> `org.apache.flink.streaming.api.checkpoint.CheckpointedFunction#initializeState`
>>> method you could add some code that would do a migration of your old state
>>> to a new one.
>>>
>>> ```
>>> private transient ValueState old;
>>> private transient ValueState new;
>>> (...)
>>> initializeState(...) {
>>>   (...)
>>>   if (new.value() == null && old.value() != null) {
>>> // code to migrate from old to new one
>>> new.update(migrate(old.value());
>>> old.update(null);
>>>   }
>>> }
>>> ```
>>>
>>> And you can drop such code later, in the next savepoint.
>>>
>>> Best,
>>> Piotrek
>>>
>>> [1]
>>> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/libs/state_processor_api/
>>>
>>> pt., 21 paź 2022 o 10:05 Sriram Ganesh  napisał(a):
>>>
 Hi All,

 I am working on a scenario where I need to modify the existing
 savepoint operator state. Ex: Wanted to remove some offset of the
 savepoint.

 What is the better practice for these scenarios?. Could you please help
 me with any example as such?

 Thanks in advance.

 --
 *Sriram G*
 *Tech*


>>
>> --
>> *Sriram G*
>> *Tech*
>>
>>


Re: Modify savepoints in Flink

2022-10-21 Thread Piotr Nowojski
ops

> Alternatively, you can modify a code of your function/operator for which
you want to modify the state. For example in the
`org.apache.flink.streaming.api.checkpoint.CheckpointedFunction#initializeState`
method you could add some code that would do a migration of your old state
to a new one.
> And you can drop such code later, in the next savepoint.

That was not entirely true. This would work for the non-keyed state. For
the keyed state there is no easy alternative (you would have to iterate
through all of the keys, which I think is not exposed via Public API) -
best to use StateProcessor API.

Best,
Piotrek

pt., 21 paź 2022 o 10:54 Sriram Ganesh  napisał(a):

> Thanks !. Will try this.
>
> On Fri, Oct 21, 2022 at 2:22 PM Piotr Nowojski 
> wrote:
>
>> Hi Sriram,
>>
>> You can read and modify savepoints using StateProcessor API [1].
>>
>> Alternatively, you can modify a code of your function/operator for which
>> you want to modify the state. For example in the
>> `org.apache.flink.streaming.api.checkpoint.CheckpointedFunction#initializeState`
>> method you could add some code that would do a migration of your old state
>> to a new one.
>>
>> ```
>> private transient ValueState old;
>> private transient ValueState new;
>> (...)
>> initializeState(...) {
>>   (...)
>>   if (new.value() == null && old.value() != null) {
>> // code to migrate from old to new one
>> new.update(migrate(old.value());
>> old.update(null);
>>   }
>> }
>> ```
>>
>> And you can drop such code later, in the next savepoint.
>>
>> Best,
>> Piotrek
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/libs/state_processor_api/
>>
>> pt., 21 paź 2022 o 10:05 Sriram Ganesh  napisał(a):
>>
>>> Hi All,
>>>
>>> I am working on a scenario where I need to modify the existing savepoint
>>> operator state. Ex: Wanted to remove some offset of the savepoint.
>>>
>>> What is the better practice for these scenarios?. Could you please help
>>> me with any example as such?
>>>
>>> Thanks in advance.
>>>
>>> --
>>> *Sriram G*
>>> *Tech*
>>>
>>>
>
> --
> *Sriram G*
> *Tech*
>
>


Re: Passing SSLContext to the ElasticSearchSink

2022-10-21 Thread Martijn Visser
Hi Nick,

I would say open a PR so one of the maintainers can have a look :)

Thanks,

Martijn

On Sun, Oct 9, 2022 at 3:16 AM Nick Levandoski 
wrote:

> Hello,
>
> I've had an issue similar to the one reported in ticket FLINK-27054. I
> would like to pass the SSLContext to the ElasticSearchSink via the
> NetworkClientConfig class.
>
> Would this be a good solution?
>
> Thanks,
> Nick
>


Re: Modify savepoints in Flink

2022-10-21 Thread Sriram Ganesh
Thanks !. Will try this.

On Fri, Oct 21, 2022 at 2:22 PM Piotr Nowojski  wrote:

> Hi Sriram,
>
> You can read and modify savepoints using StateProcessor API [1].
>
> Alternatively, you can modify a code of your function/operator for which
> you want to modify the state. For example in the
> `org.apache.flink.streaming.api.checkpoint.CheckpointedFunction#initializeState`
> method you could add some code that would do a migration of your old state
> to a new one.
>
> ```
> private transient ValueState old;
> private transient ValueState new;
> (...)
> initializeState(...) {
>   (...)
>   if (new.value() == null && old.value() != null) {
> // code to migrate from old to new one
> new.update(migrate(old.value());
> old.update(null);
>   }
> }
> ```
>
> And you can drop such code later, in the next savepoint.
>
> Best,
> Piotrek
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/libs/state_processor_api/
>
> pt., 21 paź 2022 o 10:05 Sriram Ganesh  napisał(a):
>
>> Hi All,
>>
>> I am working on a scenario where I need to modify the existing savepoint
>> operator state. Ex: Wanted to remove some offset of the savepoint.
>>
>> What is the better practice for these scenarios?. Could you please help
>> me with any example as such?
>>
>> Thanks in advance.
>>
>> --
>> *Sriram G*
>> *Tech*
>>
>>

-- 
*Sriram G*
*Tech*


Re: Modify savepoints in Flink

2022-10-21 Thread Piotr Nowojski
Hi Sriram,

You can read and modify savepoints using StateProcessor API [1].

Alternatively, you can modify a code of your function/operator for which
you want to modify the state. For example in the
`org.apache.flink.streaming.api.checkpoint.CheckpointedFunction#initializeState`
method you could add some code that would do a migration of your old state
to a new one.

```
private transient ValueState old;
private transient ValueState new;
(...)
initializeState(...) {
  (...)
  if (new.value() == null && old.value() != null) {
// code to migrate from old to new one
new.update(migrate(old.value());
old.update(null);
  }
}
```

And you can drop such code later, in the next savepoint.

Best,
Piotrek

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/libs/state_processor_api/

pt., 21 paź 2022 o 10:05 Sriram Ganesh  napisał(a):

> Hi All,
>
> I am working on a scenario where I need to modify the existing savepoint
> operator state. Ex: Wanted to remove some offset of the savepoint.
>
> What is the better practice for these scenarios?. Could you please help me
> with any example as such?
>
> Thanks in advance.
>
> --
> *Sriram G*
> *Tech*
>
>


Modify savepoints in Flink

2022-10-21 Thread Sriram Ganesh
Hi All,

I am working on a scenario where I need to modify the existing savepoint
operator state. Ex: Wanted to remove some offset of the savepoint.

What is the better practice for these scenarios?. Could you please help me
with any example as such?

Thanks in advance.

-- 
*Sriram G*
*Tech*


Broadcast state and OutOfMemoryError: Direct buffer memory

2022-10-21 Thread Dan Hill
Hi.  My team recently added broadcast state to our Flink jobs.  We've
started hitting this OOM "Direct buffer memory" error.  Is this a common
problem with broadcast state?  Or is it likely a different problem?
Thanks! - Dan