Re: Long checkpoint duration for Kafka source operators

2021-05-20 Thread Hubert Chen
For the poor soul that stumbles upon this in the future, just increase your
JM resources.

I thought for sure this must have been the TM experiencing some sort of
backpressure. I tried everything from enabling universal compaction to
unaligned checkpoints to profiling the TM. It wasn't until I enabled AWS
debug logs that I noticed the JM will make a lot of DELETE requests to AWS
after a successful checkpoint. If the checkpoint interval is short and the
JM resources limited, then I believe the checkpoint barrier will be delayed
causing long start delays. The JM is too busy making AWS requests to inject
the barrier. After I increased the JM resources, the long start delays
disappeared.

On Thu, May 13, 2021 at 1:56 PM Hubert Chen  wrote:

> Hello,
>
> I have an application that reads from two Kafka sources, joins them, and
> produces to a Kafka sink. The application is experiencing long end to end
> checkpoint durations for the Kafka source operators. I'm hoping I could get
> some direction in how to debug this further.
>
> Here is a UI screenshot of a checkpoint instance:
>
> [image: checkpoint.png]
>
> My goal is to bring the total checkpoint duration to sub-minute.
>
> Here are some observations I made:
>
>- Each source operator task has an E2E checkpoint duration of 1m 7s
>- Each source operator task has sub 100ms sync, async, aligned
>buffered, and start delay
>- Each join operator task has a start delay of 1m 7s
>- There is no backpressure in any operator
>
> These observations are leading me to believe that the source operator is
> taking a long amount of time to checkpoint. I find this a bit strange as
> the fushioned operator is fairly light. It deserializes the event, assigns
> a watermark, and might perform two filters. In addition, it's odd that both
> source operators have tasks with all the same E2E checkpoint duration.
>
> Is there some sort of locking that's occurring on the source operators
> that can explain these long E2E durations?
>
> Best,
> Hubert
>


Re: PyFlink DataStream union type mismatch

2021-05-20 Thread Dian Fu
Hi Wouter,

1) For the exception, it seems a bug. I have filed a ticket for it: 
https://issues.apache.org/jira/browse/FLINK-22733 


2) Regarding to your requirements, I guess you should do it as following:
```
init_stream = (operator_stream
   .filter(lambda r: r[0] is None)
   .map(init_operator,Types.TUPLE([Types.STRING(),Types.PICKLED_BYTE_ARRAY()]))
)

stateful_operator_stream = (operator_stream
.filter(lambda r: r[0] is not None)
.map(lambda x: (x[0], x[1]),Types.TUPLE([Types.STRING(), 
Types.PICKLED_BYTE_ARRAY()]))
)


init_stream.union(stateful_operator_stream).key_by(lambda x: 
x[0],Types.STRING())

```

The reason is that `union` will turns `KeyedStream` into `DataStream` and you 
could not perform stateful operations on `DataStream` any more.

Regards,
Dian

> 2021年5月21日 上午12:38,Wouter Zorgdrager  写道:
> 
> Dear all,
> 
> I'm having trouble unifying two data streams using the union operator in 
> PyFlink. My code basically looks like this:
> 
> init_stream = (operator_stream
>.filter(lambda r: r[0] is None)
>
> .map(init_operator,Types.TUPLE([Types.STRING(),Types.PICKLED_BYTE_ARRAY()]))
>.key_by(lambda x: x[0], Types.STRING())
> )
> 
> stateful_operator_stream = (operator_stream
> .filter(lambda r: r[0] is not None)
> .map(lambda x: (x[0], x[1]),Types.TUPLE([Types.STRING(), 
> Types.PICKLED_BYTE_ARRAY()]))
> .key_by(lambda x: x[0],Types.STRING())
> )
> 
> print(init_stream)
> print(init_stream.get_type())
> 
> print(stateful_operator_stream.get_type())
> print(stateful_operator_stream)
> 
> final_operator_stream = init_stream
> .union(stateful_operator_stream)
> .process(stateful_operator)
> 
> 
> In short, I have a datastream (operator_stream) of type Tuple[str, Event] 
> which I define as a tuple of Types.STRING() and Types.PICKLED.BYTE_ARRAY().
> When calling the union operator, I get an error which shows a type mismatch 
> between both streams:
> 
> py4j.protocol.Py4JJavaError: An error occurred while calling o732.union.
> : java.lang.IllegalArgumentException: Cannot union streams of different 
> types: Java Tuple2 and Row(f0: String, f1: 
> Java Tuple2)
>   at 
> org.apache.flink.streaming.api.datastream.DataStream.union(DataStream.java:238)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>   at 
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>   at 
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>   at 
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>   at 
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>   at 
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>   at 
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>   at java.base/java.lang.Thread.run(Thread.java:834)
> 
> However, when I print the types of both datastreams they seem similar:
> 
> 
> RowTypeInfo(f0: String, f1: TupleTypeInfo(String, PickledByteArrayTypeInfo))
> 
> RowTypeInfo(f0: String, f1: TupleTypeInfo(String, PickledByteArrayTypeInfo))
> 
> 
> Any thoughts? Thanks in advance!
> 
> Regards,
> Wouter



Re: Issues with forwarding environment variables

2021-05-20 Thread Yangze Guo
Hi, Milind

Could you help to provide the skeleton of your job code? Actually, if
you implement a custom function, like Tokenizer in the WordCount
example, the class member will be initialized at the client-side and
be serialized to the task manager. As a result, neither the system
envs nor the system properties at the TaskManager will be used.

If that is the case, you can initiate the `serviceName` field in the
map/flatMap or open function. Then, it will read the TM's envs or
properties instead.

Best,
Yangze Guo


On Fri, May 21, 2021 at 5:40 AM Milind Vaidya  wrote:
>
> This is java code. I have a flink job running and it is trying to fetch this 
> variable at run time itself. I see the properties getting reflected in the 
> logs as already mentioned but not visible from the code.
>
> On Thu, May 20, 2021 at 1:53 PM Roman Khachatryan  wrote:
>>
>> > private String serviceName = System.getenv("SERVICE_NAME");
>> Is it a scala object? If so, it can be initialized before any
>> properties are set.
>> What happens if the variable/property is read later at run time?
>>
>> Regards,
>> Roman
>>
>> On Thu, May 20, 2021 at 10:41 PM Milind Vaidya  wrote:
>> >
>> > here are the entries from taskmanager logs
>> >
>> > 2021-05-20 13:34:13,739 INFO 
>> > org.apache.flink.configuration.GlobalConfiguration - Loading configuration 
>> > property: env.java.opts.taskmanager, 
>> > "-DSERVICE_NAME=hello-test,-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005"
>> > 2021-05-20 13:34:13,740 INFO 
>> > org.apache.flink.configuration.GlobalConfiguration - Loading configuration 
>> > property: jobmanager.execution.failover-strategy, region
>> > 2021-05-20 13:34:13,742 INFO 
>> > org.apache.flink.configuration.GlobalConfiguration - Loading configuration 
>> > property: containerized.taskmanager.env.SERVICE_NAME, "hello-test"
>> > 2021-05-20 13:34:13,743 INFO 
>> > org.apache.flink.configuration.GlobalConfiguration - Loading configuration 
>> > property: containerized.master.env.SERVICE_NAME, "hello-test"
>> >
>> > But the error still persists
>> >
>> >
>> > On Thu, May 20, 2021 at 1:20 PM Roman Khachatryan  wrote:
>> >>
>> >> Thanks, it should work. I've created a ticket to track the issue [1].
>> >> Could you please specify Flink and Yarn versions you are using?
>> >>
>> >> You can also use properties (which don't depend on Yarn integration),
>> >> for example like this:
>> >> In flink-conf.yaml: env.java.opts.taskmanager: -DSERVICE_NAME=...
>> >> In the application: System.getProperty("SERVICE_NAME");
>> >>
>> >> Regards,
>> >> Roman
>> >>
>> >> On Thu, May 20, 2021 at 9:50 PM Milind Vaidya  wrote:
>> >> >
>> >> >
>> >> > Hi Roman,
>> >> >
>> >> > I have added following lines to conf/flink-conf.yaml
>> >> >
>> >> > containerized.taskmanager.env.SERVICE_NAME: "test_service_name"
>> >> > containerized.master.env.SERVICE_NAME: "test_service_name"
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> > On Thu, May 20, 2021 at 12:30 PM Roman Khachatryan  
>> >> > wrote:
>> >> >>
>> >> >> Hi,
>> >> >>
>> >> >> Could you please share the relevant parts of your flink-conf.yaml?
>> >> >>
>> >> >> Regards,
>> >> >> Roman
>> >> >>
>> >> >> On Thu, May 20, 2021 at 9:13 PM Milind Vaidya  
>> >> >> wrote:
>> >> >> >
>> >> >> > Hi
>> >> >> >
>> >> >> > Need to forward a few env variables to Job and Task manager.
>> >> >> > I am running jobs in Yarn cluster
>> >> >> > I was referring to this : Forwarding
>> >> >> >
>> >> >> > I also found Stack Overflow
>> >> >> >
>> >> >> > I was able to configure and see the variables in Flink Dashboard
>> >> >> >
>> >> >> > But the task manager logs stills says
>> >> >> >
>> >> >> > `The system environment variable SERVICE_NAME is missing` as an 
>> >> >> > exception message.
>> >> >> >
>> >> >> > The code trying to fetch it is as follows
>> >> >> >
>> >> >> > private String serviceName = System.getenv("SERVICE_NAME");
>> >> >> >
>> >> >> > Is the fetched one not the same as set one ? How to set / fetch 
>> >> >> > environment variables in such case ?
>> >> >> >


Re: Could watermark could be took into consideration after the channel become active from idle at once?

2021-05-20 Thread 刘建刚
We meet the same problem in our company. One stream always has data. The
other stream is much smaller and can be idle. Once the smaller one becomes
active, its data may be dropped in this case.

张静 [via Apache Flink User Mailing List archive.] <
ml+s2336050n43873...@n4.nabble.com> 于2021年5月21日周五 上午10:24写道:

> Hi, roman
>Thanks for reply very much.
>In our case, we see some data was dropped in window operator. We
> found the root cause by adding a temporary metric about number of
> aligned channels, found an active channel resumed from Idle was not
> took into account for some time (not alway btw). We could walk around
> by allow lateness on window operator, however I wonder does it make
> sense to add a configuration to define different watermark aligned
> behavior?
>
> Regards
> JING
>
> Roman Khachatryan <[hidden email]
> > 于2021年5月21日周五
> 上午1:05写道:
>
> >
> > Hi,
> >
> > AFAIK, this behavior is not configurable.
> > However, for this to happen the channel must consistently generate
> > watermarks smaller than watermarks from ALL aligned channels (and its
> > elements must have a smaller timestamp). I'm not sure how likely it
> > is. Is it something you see in production?
> >
> > Regards,
> > Roman
> >
> > On Thu, May 20, 2021 at 4:11 PM 张静 <[hidden email]
> > wrote:
> > >
> > > Hi community,
> > > Now after a channel become active from idle, the watermark on this
> > > channel would not be took into account when align watermarks util it
> > > generates a watermark equals to or bigger than last emitted watermark.
> > > It makes sense because it could prevent the newly active task resumed
> > > from idle dragging down the entire job.
> > >
> > > However, if the newly active task generate watermarks which are
> > > smaller than but very closely to the last emitted one for a long time,
> > > it could not be took into consideration in watermark alignment which
> > > may lead to its data maybe dropped at the later window operator.
> > >
> > > Is there any way to solve this problem? Or could we add a
> > > configuration to define different watermark aligned behavior?
> > >
> > > Any suggestion is appreciated. Thanks a lot.
> > >
> > > Regards
> > > JING
>
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Could-watermark-could-be-took-into-consideration-after-the-channel-become-active-from-idle-at-once-tp43851p43873.html
> To start a new topic under Apache Flink User Mailing List archive., email
> ml+s2336050n1...@n4.nabble.com
> To unsubscribe from Apache Flink User Mailing List archive., click here
> 
> .
> NAML
> 
>


Re: Could watermark could be took into consideration after the channel become active from idle at once?

2021-05-20 Thread 张静
Hi, roman
   Thanks for reply very much.
   In our case, we see some data was dropped in window operator. We
found the root cause by adding a temporary metric about number of
aligned channels, found an active channel resumed from Idle was not
took into account for some time (not alway btw). We could walk around
by allow lateness on window operator, however I wonder does it make
sense to add a configuration to define different watermark aligned
behavior?

Regards
JING

Roman Khachatryan  于2021年5月21日周五 上午1:05写道:
>
> Hi,
>
> AFAIK, this behavior is not configurable.
> However, for this to happen the channel must consistently generate
> watermarks smaller than watermarks from ALL aligned channels (and its
> elements must have a smaller timestamp). I'm not sure how likely it
> is. Is it something you see in production?
>
> Regards,
> Roman
>
> On Thu, May 20, 2021 at 4:11 PM 张静  wrote:
> >
> > Hi community,
> > Now after a channel become active from idle, the watermark on this
> > channel would not be took into account when align watermarks util it
> > generates a watermark equals to or bigger than last emitted watermark.
> > It makes sense because it could prevent the newly active task resumed
> > from idle dragging down the entire job.
> >
> > However, if the newly active task generate watermarks which are
> > smaller than but very closely to the last emitted one for a long time,
> > it could not be took into consideration in watermark alignment which
> > may lead to its data maybe dropped at the later window operator.
> >
> > Is there any way to solve this problem? Or could we add a
> > configuration to define different watermark aligned behavior?
> >
> > Any suggestion is appreciated. Thanks a lot.
> >
> > Regards
> > JING


Re: ES sink never receive error code

2021-05-20 Thread Yangze Guo
> So, ES BulkProcessor retried after bulk request was partially rejected. And 
> eventually that request was sent successfully? That is why failure handler 
> was not called?

If the bulk request fails after the max number of retries
(bulk.flush.backoff.retries), the failure handler will still be
called.


Best,
Yangze Guo

On Fri, May 21, 2021 at 5:53 AM Qihua Yang  wrote:
>
> Thank you for the reply!
> Yes, we did config bulk.flush.backoff.enable.
> So, ES BulkProcessor retried after bulk request was partially rejected. And 
> eventually that request was sent successfully? That is why failure handler 
> was not called?
>
> Thanks,
> Qihua
>
> On Thu, May 20, 2021 at 2:22 PM Roman Khachatryan  wrote:
>>
>> Hi,
>>
>> Have you tried to change bulk.flush.backoff.enable?
>> According to the docs [1], the underlying ES BulkProcessor will retry
>> (by default), so the provided failure handler might not be called.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/docs/connectors/datastream/elasticsearch/#configuring-the-internal-bulk-processor
>>
>> Regards,
>> Roman
>>
>> On Thu, May 20, 2021 at 10:08 PM Qihua Yang  wrote:
>> >
>> > Hello,
>> > We are using flink-connector-elasticsearch6_2.11 to ingest stream data to 
>> > ES by using bulk requests. From ES metrics, we observed some bulk thread 
>> > pool rejections. Contacted AWS team, their explanation is part of bulk 
>> > request was rejected. Response body should include status for each item. 
>> > For bulk thread pool rejection, the error code is 429.
>> > Our flink app override FailureHandler to process error cases.
>> > I checked Flink code, it has AfterBulk() method to handle item errors. 
>> > FailureHandler() never received any 429 error.
>> > Is that flink issue? Or we need to config something to make it work?
>> > Thanks,
>> >
>> > Qihua


behavior differences when sinking side outputs to a custom KeyedCoProcessFunction

2021-05-20 Thread Jin Yi
hello,

sorry for a long post, but this is a puzzling problem and i am enough of a
flink non-expert to be unsure what details are important or not.

background:
i have a flink pipeline that is a series of custom "joins" for the purposes
of user event "flattening" that i wrote a custom KeyedCoProcessFunction
that either joins on a parent id between the two connected streams using
the "left" event's primary key and the foreign key on the right event OR if
the right (child) event doesn't have a foreign key, tries to infer the join
using heuristics to limit the possible parent events and grabbing the
temporally-closest one.  both the inference and state cleanup for these
joins are happening on the onTimer method.

everything is based on event time, and i'm using kafka connector input
source for the right event inputs to these operators.  here's what the
pipeline looks like, with the joins in question acting like a chain of
joins with the output of the previous join (left input) being joined with a
new raw event source (right input):

[image: Screen Shot 2021-05-20 at 3.12.22 PM.png]
these join functions have a time window/duration or interval associated
with them to define the duration of join state and inference window.  this
is set per operator to allow for in order and out of order join thresholds
for id based joins, and this window acts as the scope for inference when a
right event that is an inference candidate (missing foreign key id) is
about to be evicted from state.

problem:

i have things coded up with side outputs for duplicate, late and dropped
events.  the dropped events case is the one i am focusing on since events
that go unmatched are dropped when they are evicted from state.  only rhs
events are the ones being dropped, with rhs events w/ foreign keys dropped
when they go unmatched (late/no left arrival or no valid inference based
left event).  with a wide enough time duration setting for both in order
and out of order durations, everything gets matched.  however, when testing
things out, i observed (expectedly) that the dropped events increases the
tighter you make the join window based on these durations.  great, this
makes sense.  i wanted to get a better understanding for these durations'
impacts, so i wrote our integration/stress test case to focus on just id
key based joins to start on this.

further, to help observe the dropping characteristics, i connected the side
outputs to some s3 sinks to store these dropped events.  originally, these
dropped right events were output properly to the s3 output.  for the
integration/stress test setup, they start to appear with durations < 1
minute.

however, i observed that they didn't include the flink Context.timestamp
encoded in the event structure anywhere (the left events were already
setting the timestamp in the processElement1 method).  i wanted this
information to see how event time processing worked in practice.  so, i
made a similarly simple change to the processElement2 function to set the
timestamp on these right events as they came in.

once i did this, things stopped dropping and everything joined, even if i
set the durations down to 1 second on either side.  wut?

i can comment out the single right hand side event timestamp setting code,
and get the dropped events (sans flink timestamp).  uncommenting this code
to put the timestamps back in, things fully match again.

what is going on?  it feels like a heisenberg effect with the "touched"
right events.

any help here would be greatly appreciated.



once i made this change, all the joins


Re: ES sink never receive error code

2021-05-20 Thread Qihua Yang
Thank you for the reply!
Yes, we did config bulk.flush.backoff.enable.
So, ES BulkProcessor retried after bulk request was partially rejected. And
eventually that request was sent successfully? That is why failure handler
was not called?

Thanks,
Qihua

On Thu, May 20, 2021 at 2:22 PM Roman Khachatryan  wrote:

> Hi,
>
> Have you tried to change bulk.flush.backoff.enable?
> According to the docs [1], the underlying ES BulkProcessor will retry
> (by default), so the provided failure handler might not be called.
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-stable/docs/connectors/datastream/elasticsearch/#configuring-the-internal-bulk-processor
>
> Regards,
> Roman
>
> On Thu, May 20, 2021 at 10:08 PM Qihua Yang  wrote:
> >
> > Hello,
> > We are using flink-connector-elasticsearch6_2.11 to ingest stream data
> to ES by using bulk requests. From ES metrics, we observed some bulk thread
> pool rejections. Contacted AWS team, their explanation is part of bulk
> request was rejected. Response body should include status for each item.
> For bulk thread pool rejection, the error code is 429.
> > Our flink app override FailureHandler to process error cases.
> > I checked Flink code, it has AfterBulk() method to handle item errors.
> FailureHandler() never received any 429 error.
> > Is that flink issue? Or we need to config something to make it work?
> > Thanks,
> >
> > Qihua
>


Re: Issues with forwarding environment variables

2021-05-20 Thread Milind Vaidya
This is java code. I have a flink job running and it is trying to fetch
this variable at run time itself. I see the properties getting reflected in
the logs as already mentioned but not visible from the code.

On Thu, May 20, 2021 at 1:53 PM Roman Khachatryan  wrote:

> > private String serviceName = System.getenv("SERVICE_NAME");
> Is it a scala object? If so, it can be initialized before any
> properties are set.
> What happens if the variable/property is read later at run time?
>
> Regards,
> Roman
>
> On Thu, May 20, 2021 at 10:41 PM Milind Vaidya  wrote:
> >
> > here are the entries from taskmanager logs
> >
> > 2021-05-20 13:34:13,739 INFO
> org.apache.flink.configuration.GlobalConfiguration - Loading configuration
> property: env.java.opts.taskmanager,
> "-DSERVICE_NAME=hello-test,-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005"
> > 2021-05-20 13:34:13,740 INFO
> org.apache.flink.configuration.GlobalConfiguration - Loading configuration
> property: jobmanager.execution.failover-strategy, region
> > 2021-05-20 13:34:13,742 INFO
> org.apache.flink.configuration.GlobalConfiguration - Loading configuration
> property: containerized.taskmanager.env.SERVICE_NAME, "hello-test"
> > 2021-05-20 13:34:13,743 INFO
> org.apache.flink.configuration.GlobalConfiguration - Loading configuration
> property: containerized.master.env.SERVICE_NAME, "hello-test"
> >
> > But the error still persists
> >
> >
> > On Thu, May 20, 2021 at 1:20 PM Roman Khachatryan 
> wrote:
> >>
> >> Thanks, it should work. I've created a ticket to track the issue [1].
> >> Could you please specify Flink and Yarn versions you are using?
> >>
> >> You can also use properties (which don't depend on Yarn integration),
> >> for example like this:
> >> In flink-conf.yaml: env.java.opts.taskmanager: -DSERVICE_NAME=...
> >> In the application: System.getProperty("SERVICE_NAME");
> >>
> >> Regards,
> >> Roman
> >>
> >> On Thu, May 20, 2021 at 9:50 PM Milind Vaidya 
> wrote:
> >> >
> >> >
> >> > Hi Roman,
> >> >
> >> > I have added following lines to conf/flink-conf.yaml
> >> >
> >> > containerized.taskmanager.env.SERVICE_NAME: "test_service_name"
> >> > containerized.master.env.SERVICE_NAME: "test_service_name"
> >> >
> >> >
> >> >
> >> >
> >> >
> >> > On Thu, May 20, 2021 at 12:30 PM Roman Khachatryan 
> wrote:
> >> >>
> >> >> Hi,
> >> >>
> >> >> Could you please share the relevant parts of your flink-conf.yaml?
> >> >>
> >> >> Regards,
> >> >> Roman
> >> >>
> >> >> On Thu, May 20, 2021 at 9:13 PM Milind Vaidya 
> wrote:
> >> >> >
> >> >> > Hi
> >> >> >
> >> >> > Need to forward a few env variables to Job and Task manager.
> >> >> > I am running jobs in Yarn cluster
> >> >> > I was referring to this : Forwarding
> >> >> >
> >> >> > I also found Stack Overflow
> >> >> >
> >> >> > I was able to configure and see the variables in Flink Dashboard
> >> >> >
> >> >> > But the task manager logs stills says
> >> >> >
> >> >> > `The system environment variable SERVICE_NAME is missing` as an
> exception message.
> >> >> >
> >> >> > The code trying to fetch it is as follows
> >> >> >
> >> >> > private String serviceName = System.getenv("SERVICE_NAME");
> >> >> >
> >> >> > Is the fetched one not the same as set one ? How to set / fetch
> environment variables in such case ?
> >> >> >
>


Re: ES sink never receive error code

2021-05-20 Thread Roman Khachatryan
Hi,

Have you tried to change bulk.flush.backoff.enable?
According to the docs [1], the underlying ES BulkProcessor will retry
(by default), so the provided failure handler might not be called.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/docs/connectors/datastream/elasticsearch/#configuring-the-internal-bulk-processor

Regards,
Roman

On Thu, May 20, 2021 at 10:08 PM Qihua Yang  wrote:
>
> Hello,
> We are using flink-connector-elasticsearch6_2.11 to ingest stream data to ES 
> by using bulk requests. From ES metrics, we observed some bulk thread pool 
> rejections. Contacted AWS team, their explanation is part of bulk request was 
> rejected. Response body should include status for each item. For bulk thread 
> pool rejection, the error code is 429.
> Our flink app override FailureHandler to process error cases.
> I checked Flink code, it has AfterBulk() method to handle item errors. 
> FailureHandler() never received any 429 error.
> Is that flink issue? Or we need to config something to make it work?
> Thanks,
>
> Qihua


Re: Issues with forwarding environment variables

2021-05-20 Thread Roman Khachatryan
> private String serviceName = System.getenv("SERVICE_NAME");
Is it a scala object? If so, it can be initialized before any
properties are set.
What happens if the variable/property is read later at run time?

Regards,
Roman

On Thu, May 20, 2021 at 10:41 PM Milind Vaidya  wrote:
>
> here are the entries from taskmanager logs
>
> 2021-05-20 13:34:13,739 INFO 
> org.apache.flink.configuration.GlobalConfiguration - Loading configuration 
> property: env.java.opts.taskmanager, 
> "-DSERVICE_NAME=hello-test,-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005"
> 2021-05-20 13:34:13,740 INFO 
> org.apache.flink.configuration.GlobalConfiguration - Loading configuration 
> property: jobmanager.execution.failover-strategy, region
> 2021-05-20 13:34:13,742 INFO 
> org.apache.flink.configuration.GlobalConfiguration - Loading configuration 
> property: containerized.taskmanager.env.SERVICE_NAME, "hello-test"
> 2021-05-20 13:34:13,743 INFO 
> org.apache.flink.configuration.GlobalConfiguration - Loading configuration 
> property: containerized.master.env.SERVICE_NAME, "hello-test"
>
> But the error still persists
>
>
> On Thu, May 20, 2021 at 1:20 PM Roman Khachatryan  wrote:
>>
>> Thanks, it should work. I've created a ticket to track the issue [1].
>> Could you please specify Flink and Yarn versions you are using?
>>
>> You can also use properties (which don't depend on Yarn integration),
>> for example like this:
>> In flink-conf.yaml: env.java.opts.taskmanager: -DSERVICE_NAME=...
>> In the application: System.getProperty("SERVICE_NAME");
>>
>> Regards,
>> Roman
>>
>> On Thu, May 20, 2021 at 9:50 PM Milind Vaidya  wrote:
>> >
>> >
>> > Hi Roman,
>> >
>> > I have added following lines to conf/flink-conf.yaml
>> >
>> > containerized.taskmanager.env.SERVICE_NAME: "test_service_name"
>> > containerized.master.env.SERVICE_NAME: "test_service_name"
>> >
>> >
>> >
>> >
>> >
>> > On Thu, May 20, 2021 at 12:30 PM Roman Khachatryan  
>> > wrote:
>> >>
>> >> Hi,
>> >>
>> >> Could you please share the relevant parts of your flink-conf.yaml?
>> >>
>> >> Regards,
>> >> Roman
>> >>
>> >> On Thu, May 20, 2021 at 9:13 PM Milind Vaidya  wrote:
>> >> >
>> >> > Hi
>> >> >
>> >> > Need to forward a few env variables to Job and Task manager.
>> >> > I am running jobs in Yarn cluster
>> >> > I was referring to this : Forwarding
>> >> >
>> >> > I also found Stack Overflow
>> >> >
>> >> > I was able to configure and see the variables in Flink Dashboard
>> >> >
>> >> > But the task manager logs stills says
>> >> >
>> >> > `The system environment variable SERVICE_NAME is missing` as an 
>> >> > exception message.
>> >> >
>> >> > The code trying to fetch it is as follows
>> >> >
>> >> > private String serviceName = System.getenv("SERVICE_NAME");
>> >> >
>> >> > Is the fetched one not the same as set one ? How to set / fetch 
>> >> > environment variables in such case ?
>> >> >


Re: Job recovery issues with state restoration

2021-05-20 Thread Roman Khachatryan
Hi Peter,

Do you experience this issue if running without local recovery or
incremental checkpoints enabled?
Or have you maybe compared local (on TM) and  remove (on DFS) SST files?

Regards,
Roman

On Thu, May 20, 2021 at 5:54 PM Peter Westermann
 wrote:
>
> Hello,
>
>
>
> I’ve reported issues around checkpoint recovery in case of a job failure due 
> to zookeeper connection loss in the past. I am still seeing issues 
> occasionally.
>
> This is for Flink 1.12.3 with zookeeper for HA, S3 as the state backend, 
> incremental checkpoints, and task-local recovery enabled.
>
>
>
> Here’s what happened: A zookeeper instance was terminated as part of a 
> deployment for our zookeeper service, this caused a new jobmanager leader 
> election (so far so good). A leader was elected and the job was restarted 
> from the latest checkpoint but never became healthy. The root exception and 
> the logs show issues reading state:
>
> o.r.RocksDBException: Sst file size mismatch: 
> /mnt/data/tmp/flink-io-7139fea9-2dd8-42e6-8ffb-4d1a826f77d6/job_993eca72823b5ac13a377d7a844ac1b5_op_KeyedCoProcessOperator_d80b7e861bf73bdf93b8b27e5881807f__10_44__uuid_d3c2d251-c046-494a-bc25-57985a01fda1/db/003579.sst.
>  Size recorded in manifest 36718, actual size 2570\
> Sst file size mismatch: 
> /mnt/data/tmp/flink-io-7139fea9-2dd8-42e6-8ffb-4d1a826f77d6/job_993eca72823b5ac13a377d7a844ac1b5_op_KeyedCoProcessOperator_d80b7e861bf73bdf93b8b27e5881807f__10_44__uuid_d3c2d251-c046-494a-bc25-57985a01fda1/db/003573.sst.
>  Size recorded in manifest 13756, actual size 1307\
> Sst file size mismatch: 
> /mnt/data/tmp/flink-io-7139fea9-2dd8-42e6-8ffb-4d1a826f77d6/job_993eca72823b5ac13a377d7a844ac1b5_op_KeyedCoProcessOperator_d80b7e861bf73bdf93b8b27e5881807f__10_44__uuid_d3c2d251-c046-494a-bc25-57985a01fda1/db/003575.sst.
>  Size recorded in manifest 16278, actual size 1138\
> Sst file size mismatch: 
> /mnt/data/tmp/flink-io-7139fea9-2dd8-42e6-8ffb-4d1a826f77d6/job_993eca72823b5ac13a377d7a844ac1b5_op_KeyedCoProcessOperator_d80b7e861bf73bdf93b8b27e5881807f__10_44__uuid_d3c2d251-c046-494a-bc25-57985a01fda1/db/003576.sst.
>  Size recorded in manifest 23108, actual size 1267\
> Sst file size mismatch: 
> /mnt/data/tmp/flink-io-7139fea9-2dd8-42e6-8ffb-4d1a826f77d6/job_993eca72823b5ac13a377d7a844ac1b5_op_KeyedCoProcessOperator_d80b7e861bf73bdf93b8b27e5881807f__10_44__uuid_d3c2d251-c046-494a-bc25-57985a01fda1/db/003577.sst.
>  Size recorded in manifest 148089, actual size 1293\
> \
> \\tat org.rocksdb.RocksDB.open(RocksDB.java)\
> \\tat org.rocksdb.RocksDB.open(RocksDB.java:286)\
> \\tat o.a.f.c.s.s.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:80)\
> \\t... 22 common frames omitted\
> Wrapped by: java.io.IOException: Error while opening RocksDB instance.\
> \\tat o.a.f.c.s.s.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:92)\
> \\tat 
> o.a.f.c.s.s.r.AbstractRocksDBRestoreOperation.openDB(AbstractRocksDBRestoreOperation.java:145)\
> \\tat 
> o.a.f.c.s.s.r.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOper...
>
>
>
> Since we retain multiple checkpoints, I tried redeploying the job from all 
> checkpoints that were still available. All those attempts lead to similar 
> failures. (I eventually had to use an older savepoint to recover the job.)
>
> Any guidance for avoiding this would be appreciated.
>
>
>
> Peter


Re: Issues with forwarding environment variables

2021-05-20 Thread Milind Vaidya
here are the entries from taskmanager logs

2021-05-20 13:34:13,739 INFO org.apache.flink.configuration.
GlobalConfiguration - Loading configuration property:
env.java.opts.taskmanager,
"-DSERVICE_NAME=hello-test,-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005"
2021-05-20 13:34:13,740 INFO org.apache.flink.configuration.
GlobalConfiguration - Loading configuration property:
jobmanager.execution.failover-strategy, region
2021-05-20 13:34:13,742 INFO org.apache.flink.configuration.
GlobalConfiguration - Loading configuration property:
containerized.taskmanager.env.SERVICE_NAME, "hello-test"
2021-05-20 13:34:13,743 INFO org.apache.flink.configuration.
GlobalConfiguration - Loading configuration property:
containerized.master.env.SERVICE_NAME, "hello-test"

But the error still persists


On Thu, May 20, 2021 at 1:20 PM Roman Khachatryan  wrote:

> Thanks, it should work. I've created a ticket to track the issue [1].
> Could you please specify Flink and Yarn versions you are using?
>
> You can also use properties (which don't depend on Yarn integration),
> for example like this:
> In flink-conf.yaml: env.java.opts.taskmanager: -DSERVICE_NAME=...
> In the application: System.getProperty("SERVICE_NAME");
>
> Regards,
> Roman
>
> On Thu, May 20, 2021 at 9:50 PM Milind Vaidya  wrote:
> >
> >
> > Hi Roman,
> >
> > I have added following lines to conf/flink-conf.yaml
> >
> > containerized.taskmanager.env.SERVICE_NAME: "test_service_name"
> > containerized.master.env.SERVICE_NAME: "test_service_name"
> >
> >
> >
> >
> >
> > On Thu, May 20, 2021 at 12:30 PM Roman Khachatryan 
> wrote:
> >>
> >> Hi,
> >>
> >> Could you please share the relevant parts of your flink-conf.yaml?
> >>
> >> Regards,
> >> Roman
> >>
> >> On Thu, May 20, 2021 at 9:13 PM Milind Vaidya 
> wrote:
> >> >
> >> > Hi
> >> >
> >> > Need to forward a few env variables to Job and Task manager.
> >> > I am running jobs in Yarn cluster
> >> > I was referring to this : Forwarding
> >> >
> >> > I also found Stack Overflow
> >> >
> >> > I was able to configure and see the variables in Flink Dashboard
> >> >
> >> > But the task manager logs stills says
> >> >
> >> > `The system environment variable SERVICE_NAME is missing` as an
> exception message.
> >> >
> >> > The code trying to fetch it is as follows
> >> >
> >> > private String serviceName = System.getenv("SERVICE_NAME");
> >> >
> >> > Is the fetched one not the same as set one ? How to set / fetch
> environment variables in such case ?
> >> >
>


Re: Issues with forwarding environment variables

2021-05-20 Thread Roman Khachatryan
Thanks, it should work. I've created a ticket to track the issue [1].
Could you please specify Flink and Yarn versions you are using?

You can also use properties (which don't depend on Yarn integration),
for example like this:
In flink-conf.yaml: env.java.opts.taskmanager: -DSERVICE_NAME=...
In the application: System.getProperty("SERVICE_NAME");

Regards,
Roman

On Thu, May 20, 2021 at 9:50 PM Milind Vaidya  wrote:
>
>
> Hi Roman,
>
> I have added following lines to conf/flink-conf.yaml
>
> containerized.taskmanager.env.SERVICE_NAME: "test_service_name"
> containerized.master.env.SERVICE_NAME: "test_service_name"
>
>
>
>
>
> On Thu, May 20, 2021 at 12:30 PM Roman Khachatryan  wrote:
>>
>> Hi,
>>
>> Could you please share the relevant parts of your flink-conf.yaml?
>>
>> Regards,
>> Roman
>>
>> On Thu, May 20, 2021 at 9:13 PM Milind Vaidya  wrote:
>> >
>> > Hi
>> >
>> > Need to forward a few env variables to Job and Task manager.
>> > I am running jobs in Yarn cluster
>> > I was referring to this : Forwarding
>> >
>> > I also found Stack Overflow
>> >
>> > I was able to configure and see the variables in Flink Dashboard
>> >
>> > But the task manager logs stills says
>> >
>> > `The system environment variable SERVICE_NAME is missing` as an exception 
>> > message.
>> >
>> > The code trying to fetch it is as follows
>> >
>> > private String serviceName = System.getenv("SERVICE_NAME");
>> >
>> > Is the fetched one not the same as set one ? How to set / fetch 
>> > environment variables in such case ?
>> >


ES sink never receive error code

2021-05-20 Thread Qihua Yang
Hello,
We are using flink-connector-elasticsearch6_2.11 to ingest stream data to
ES by using bulk requests. From ES metrics, we observed some bulk thread
pool rejections. Contacted AWS team, their explanation is part of bulk
request was rejected. Response body should include status for each item.
For bulk thread pool rejection, the error code is 429.
Our flink app override FailureHandler to process error cases.
I checked Flink code, it has AfterBulk() method to handle item errors.
FailureHandler() never received any 429 error.
Is that flink issue? Or we need to config something to make it work?
Thanks,

Qihua


Re: Issues with forwarding environment variables

2021-05-20 Thread Milind Vaidya
Hi Roman,

I have added following lines to conf/flink-conf.yaml

containerized.taskmanager.env.SERVICE_NAME: "test_service_name"
containerized.master.env.SERVICE_NAME: "test_service_name"





On Thu, May 20, 2021 at 12:30 PM Roman Khachatryan  wrote:

> Hi,
>
> Could you please share the relevant parts of your flink-conf.yaml?
>
> Regards,
> Roman
>
> On Thu, May 20, 2021 at 9:13 PM Milind Vaidya  wrote:
> >
> > Hi
> >
> > Need to forward a few env variables to Job and Task manager.
> > I am running jobs in Yarn cluster
> > I was referring to this : Forwarding
> >
> > I also found Stack Overflow
> >
> > I was able to configure and see the variables in Flink Dashboard
> >
> > But the task manager logs stills says
> >
> > `The system environment variable SERVICE_NAME is missing` as an
> exception message.
> >
> > The code trying to fetch it is as follows
> >
> > private String serviceName = System.getenv("SERVICE_NAME");
> >
> > Is the fetched one not the same as set one ? How to set / fetch
> environment variables in such case ?
> >
>


Re: Issues with forwarding environment variables

2021-05-20 Thread Roman Khachatryan
Hi,

Could you please share the relevant parts of your flink-conf.yaml?

Regards,
Roman

On Thu, May 20, 2021 at 9:13 PM Milind Vaidya  wrote:
>
> Hi
>
> Need to forward a few env variables to Job and Task manager.
> I am running jobs in Yarn cluster
> I was referring to this : Forwarding
>
> I also found Stack Overflow
>
> I was able to configure and see the variables in Flink Dashboard
>
> But the task manager logs stills says
>
> `The system environment variable SERVICE_NAME is missing` as an exception 
> message.
>
> The code trying to fetch it is as follows
>
> private String serviceName = System.getenv("SERVICE_NAME");
>
> Is the fetched one not the same as set one ? How to set / fetch environment 
> variables in such case ?
>


Re: Best practice for adding support for Kafka variants

2021-05-20 Thread Roman Khachatryan
Hi,

Those classes will likely be deprecated in the future in favor of
FLIP-27 [1][2] source and FLIP-143 [3] sink implementations and
eventually removed (though it won't happen soon).
You probably should take a look at the above new APIs.

Either way, there is no such a recommendation AFAIK. Copied connector
classes will have to be updated if something in Flink changes. Maybe a
better way would be to build your own kafka client and use it to build
flink-kafka connector (by overriding ${kafka.version} for example).

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
[2]
https://issues.apache.org/jira/browse/FLINK-18323
[3]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API

Regards,
Roman

On Thu, May 20, 2021 at 7:45 PM deepthi Sridharan
 wrote:
>
> Hi,
>
> We have an internal version of Open source Kafka consumer and producer that 
> we use and are working on adding that as a source and sink for flink.
>
> It seems like the easiest way to add the consumer as source would be to 
> override the FlinkKafkaConsumer class's createFetcher method to provide our 
> own derived class of KafkaFetcher class which can hookup its own version of 
> the consumerThread. But the fetcher classes are annotated as Internal and 
> seems like it is not meant to be used this way. (And the changes for Producer 
> would be on similar lines).
>
> Is there a recommendation for how to add new flavors of Kafka 
> Consumer/Producer from the community? Would it be recommended to maintain a 
> copy of all the connector classes so we don't have to deal with changes to 
> classes tagged as internal?
>
> --
> Thanks & Regards
>


Issues with forwarding environment variables

2021-05-20 Thread Milind Vaidya
Hi

Need to forward a few env variables to Job and Task manager.
I am running jobs in Yarn cluster
I was referring to this : Forwarding


I also found Stack Overflow


I was able to configure and see the variables in Flink Dashboard

But the task manager logs stills says

*`The system environment variable SERVICE_NAME is missing`* as an exception
message.

The code trying to fetch it is as follows

*private String serviceName = System.getenv("SERVICE_NAME");*

Is the fetched one not the same as set one ? How to set / fetch environment
variables in such case ?


Best practice for adding support for Kafka variants

2021-05-20 Thread deepthi Sridharan
Hi,

We have an internal version of Open source Kafka consumer and producer that
we use and are working on adding that as a source and sink for flink.

It seems like the easiest way to add the consumer as source would be to
override the FlinkKafkaConsumer class's createFetcher

method to provide our own derived class of KafkaFetcher

class which can hookup its own version of the consumerThread. But the
fetcher classes are annotated as Internal and seems like it is not meant to
be used this way. (And the changes for Producer would be on similar lines).

Is there a recommendation for how to add new flavors of Kafka
Consumer/Producer from the community? Would it be recommended to maintain a
copy of all the connector classes so we don't have to deal with changes to
classes tagged as internal?

-- 
Thanks & Regards


Re: Could watermark could be took into consideration after the channel become active from idle at once?

2021-05-20 Thread Roman Khachatryan
Hi,

AFAIK, this behavior is not configurable.
However, for this to happen the channel must consistently generate
watermarks smaller than watermarks from ALL aligned channels (and its
elements must have a smaller timestamp). I'm not sure how likely it
is. Is it something you see in production?

Regards,
Roman

On Thu, May 20, 2021 at 4:11 PM 张静  wrote:
>
> Hi community,
> Now after a channel become active from idle, the watermark on this
> channel would not be took into account when align watermarks util it
> generates a watermark equals to or bigger than last emitted watermark.
> It makes sense because it could prevent the newly active task resumed
> from idle dragging down the entire job.
>
> However, if the newly active task generate watermarks which are
> smaller than but very closely to the last emitted one for a long time,
> it could not be took into consideration in watermark alignment which
> may lead to its data maybe dropped at the later window operator.
>
> Is there any way to solve this problem? Or could we add a
> configuration to define different watermark aligned behavior?
>
> Any suggestion is appreciated. Thanks a lot.
>
> Regards
> JING


Re: Parallelism in Production: Best Practices

2021-05-20 Thread Yaroslav Tkachenko
Hi Jan, thanks for sharing this!

Just wanted to confirm: this approach works because of the task slot
sharing feature in Flink, doesn't it?

On Thu, May 20, 2021 at 1:12 AM Jan Brusch 
wrote:

>
> Hi Yaroslav,
>
> here's a fourth option that we usually use: We set the default
> parallelism once when we initially deploy the app (maybe change it a few
> times in the beginning). From that point on rescale by either resizing
> the TaskManager-Nodes or redistributing the parallelism over more / less
> TaskManager-Nodes.
>
> For example: We start to run an app with a default parallelism of 64 and
> we initially distribute this over 16 TaskManager-Nodes with 4 taskSlots
> each. Then we see that we have scaled way too high for the actual
> workload. We now have two options: Either reduce the hardware on the 16
> Nodes (CPU and RAM) or re-scale horizontally by re-distributing the
> workload over 8 TaskManager-Nodes with 8 taskSlots each.
>
> Since we leave the parallelism of the Job untouched in each case, we can
> easily rescale by re-deploying the whole cluster and let it resume from
> the last checkpoint. A cleaner way would probably be to do this
> re-deployment with explicit savepoints.
>
> We are doing this in kubernetes where both scaling options are really
> easy to carry out. But the same concepts should work on any other setup,
> too.
>
>
> Hope that helps
>
> Jan
>
> On 19.05.21 20:00, Yaroslav Tkachenko wrote:
> > Hi everyone,
> >
> > I'd love to learn more about how different companies approach
> > specifying Flink parallelism. I'm specifically interested in real,
> > production workloads.
> >
> > I can see a few common patterns:
> >
> > - Rely on default parallelism, scale by changing parallelism for the
> > whole pipeline. I guess it only works if the pipeline doesn't have
> > obvious bottlenecks. Also, it looks like the new reactive mode makes
> > specifying parallelism for an operator obsolete
> > (
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/elastic_scaling/#configuration
> > <
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/elastic_scaling/#configuration
> >)
> >
> > - Rely on default parallelism for most of the operators, but override
> > it for some. For example, it doesn't make sense for a Kafka source to
> > have parallelism higher than the number of partitions it consumes.
> > Some custom sinks could choose lower parallelism to avoid overloading
> > their destinations. Some transformation steps could choose higher
> > parallelism to distribute the work better, etc.
> >
> > - Don't rely on default parallelism and configure parallelism
> > explicitly for each operator. This requires very good knowledge of
> > each operator in the pipeline, but it could lead to very good
> performance.
> >
> > Is there a different pattern that I miss? What do you use? Feel free
> > to share any resources.
> >
> > If you do specify it explicitly, what do you think about the reactive
> > mode? Will you use it?
> >
> > Also, how often do you change parallelism? Do you set it once and
> > forget once the pipeline is stable? Do you keep re-evaluating it?
> >
> > Thanks.
>
> --
> neuland  – Büro für Informatik GmbH
> Konsul-Smidt-Str. 8g, 28217 Bremen
>
> Telefon (0421) 380107 57
> Fax (0421) 380107 99
> https://www.neuland-bfi.de
>
> https://twitter.com/neuland
> https://facebook.com/neulandbfi
> https://xing.com/company/neulandbfi
>
>
> Geschäftsführer: Thomas Gebauer, Jan Zander
> Registergericht: Amtsgericht Bremen, HRB 23395 HB
> USt-ID. DE 246585501
>
>


PyFlink DataStream union type mismatch

2021-05-20 Thread Wouter Zorgdrager
Dear all,

I'm having trouble unifying two data streams using the union operator in
PyFlink. My code basically looks like this:

init_stream = (operator_stream
   .filter(lambda r: r[0] is None)
   .map(init_operator,Types.TUPLE([Types.STRING(),Types.PICKLED_BYTE_ARRAY()]))
   .key_by(lambda x: x[0], Types.STRING())
)

stateful_operator_stream = (operator_stream
.filter(lambda r: r[0] is not None)
.map(lambda x: (x[0], x[1]),Types.TUPLE([Types.STRING(),
Types.PICKLED_BYTE_ARRAY()]))
.key_by(lambda x: x[0],Types.STRING())
)
print(init_stream)print(init_stream.get_type())
print(stateful_operator_stream.get_type())print(stateful_operator_stream)

final_operator_stream = init_stream
.union(stateful_operator_stream)
.process(stateful_operator)


In short, I have a datastream (operator_stream) of type Tuple[str,
Event] which I define as a tuple of Types.STRING() and
Types.PICKLED.BYTE_ARRAY().
When calling the union operator, I get an error which shows a type
mismatch between both streams:

py4j.protocol.Py4JJavaError: An error occurred while calling o732.union.
: java.lang.IllegalArgumentException: Cannot union streams of
different types: Java Tuple2 and
Row(f0: String, f1: Java Tuple2)
at 
org.apache.flink.streaming.api.datastream.DataStream.union(DataStream.java:238)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at 
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:834)
However, when I print the types of both datastreams they seem similar:


RowTypeInfo(f0: String, f1: TupleTypeInfo(String, PickledByteArrayTypeInfo))
RowTypeInfo(f0: String, f1: TupleTypeInfo(String, PickledByteArrayTypeInfo))

Any thoughts? Thanks in advance!

Regards,
Wouter


Job recovery issues with state restoration

2021-05-20 Thread Peter Westermann
Hello,

I’ve reported issues around checkpoint recovery in case of a job failure due to 
zookeeper connection loss in the past. I am still seeing issues occasionally.
This is for Flink 1.12.3 with zookeeper for HA, S3 as the state backend, 
incremental checkpoints, and task-local recovery enabled.

Here’s what happened: A zookeeper instance was terminated as part of a 
deployment for our zookeeper service, this caused a new jobmanager leader 
election (so far so good). A leader was elected and the job was restarted from 
the latest checkpoint but never became healthy. The root exception and the logs 
show issues reading state:
o.r.RocksDBException: Sst file size mismatch: 
/mnt/data/tmp/flink-io-7139fea9-2dd8-42e6-8ffb-4d1a826f77d6/job_993eca72823b5ac13a377d7a844ac1b5_op_KeyedCoProcessOperator_d80b7e861bf73bdf93b8b27e5881807f__10_44__uuid_d3c2d251-c046-494a-bc25-57985a01fda1/db/003579.sst.
 Size recorded in manifest 36718, actual size 2570\
Sst file size mismatch: 
/mnt/data/tmp/flink-io-7139fea9-2dd8-42e6-8ffb-4d1a826f77d6/job_993eca72823b5ac13a377d7a844ac1b5_op_KeyedCoProcessOperator_d80b7e861bf73bdf93b8b27e5881807f__10_44__uuid_d3c2d251-c046-494a-bc25-57985a01fda1/db/003573.sst.
 Size recorded in manifest 13756, actual size 1307\
Sst file size mismatch: 
/mnt/data/tmp/flink-io-7139fea9-2dd8-42e6-8ffb-4d1a826f77d6/job_993eca72823b5ac13a377d7a844ac1b5_op_KeyedCoProcessOperator_d80b7e861bf73bdf93b8b27e5881807f__10_44__uuid_d3c2d251-c046-494a-bc25-57985a01fda1/db/003575.sst.
 Size recorded in manifest 16278, actual size 1138\
Sst file size mismatch: 
/mnt/data/tmp/flink-io-7139fea9-2dd8-42e6-8ffb-4d1a826f77d6/job_993eca72823b5ac13a377d7a844ac1b5_op_KeyedCoProcessOperator_d80b7e861bf73bdf93b8b27e5881807f__10_44__uuid_d3c2d251-c046-494a-bc25-57985a01fda1/db/003576.sst.
 Size recorded in manifest 23108, actual size 1267\
Sst file size mismatch: 
/mnt/data/tmp/flink-io-7139fea9-2dd8-42e6-8ffb-4d1a826f77d6/job_993eca72823b5ac13a377d7a844ac1b5_op_KeyedCoProcessOperator_d80b7e861bf73bdf93b8b27e5881807f__10_44__uuid_d3c2d251-c046-494a-bc25-57985a01fda1/db/003577.sst.
 Size recorded in manifest 148089, actual size 1293\
\
\\tat org.rocksdb.RocksDB.open(RocksDB.java)\
\\tat org.rocksdb.RocksDB.open(RocksDB.java:286)\
\\tat o.a.f.c.s.s.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:80)\
\\t... 22 common frames omitted\
Wrapped by: java.io.IOException: Error while opening RocksDB instance.\
\\tat o.a.f.c.s.s.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:92)\
\\tat 
o.a.f.c.s.s.r.AbstractRocksDBRestoreOperation.openDB(AbstractRocksDBRestoreOperation.java:145)\
\\tat 
o.a.f.c.s.s.r.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOper...

Since we retain multiple checkpoints, I tried redeploying the job from all 
checkpoints that were still available. All those attempts lead to similar 
failures. (I eventually had to use an older savepoint to recover the job.)
Any guidance for avoiding this would be appreciated.

Peter


Re: [Statefun] Truncated Messages in Python workers

2021-05-20 Thread Stephan Ewen
Thanks for reporting this, it looks indeed like a potential bug.

I filed this Jira for it: https://issues.apache.org/jira/browse/FLINK-22729

Could you share (here ot in Jira) what the stack on the Python Worker side
is (for example which HTTP server)? Do you know if the message truncation
happens reliably at a certain message size?


On Wed, May 19, 2021 at 2:12 PM Jan Brusch 
wrote:

> Hi,
>
> recently we started seeing the following faulty behaviour in the Flink
> Stateful Functions HTTP communication towards external Python workers.
> This is only occuring when the system is under heavy load.
>
> The Java Application will send HTTP Messages to an external Python
> Function but the external Function fails to parse the message with a
> "Truncated Message Error". Printouts show that the truncated message
> looks as follows:
>
> --
>
> 
>
> my.protobuf.MyClass: 
>
> my.protobuf.MyClass: 
>
> my.protobuf.MyClass: 
>
> my.protobuf.MyClass: 
> --
>
> Which leads to the following Error in the Python worker:
>
> --
>
> Error Parsing Message: Truncated Message
>
> --
>
> Either the sender or the receiver (or something in between) seems to be
> truncacting some (not all) messages at some random point in the payload.
> The source code in both Flink SDKs looks to be correct. We temporarily
> solved this by setting the "maxNumBatchRequests" parameter in the
> external function definition really low. But this is not an ideal
> solution as we believe this adds considerable communication overhead
> between the Java and the Python Functions.
>
> The Stateful Function version is 2.2.2, java8. The Java App as well as
> the external Python workers are deployed in the same kubernetes cluster.
>
>
> Has anyone ever seen this problem before?
>
> Best regards
>
> Jan
>
> --
> neuland  – Büro für Informatik GmbH
> Konsul-Smidt-Str. 8g, 28217 Bremen
>
> Telefon (0421) 380107 57
> Fax (0421) 380107 99
> https://www.neuland-bfi.de
>
> https://twitter.com/neuland
> https://facebook.com/neulandbfi
> https://xing.com/company/neulandbfi
>
>
> Geschäftsführer: Thomas Gebauer, Jan Zander
> Registergericht: Amtsgericht Bremen, HRB 23395 HB
> USt-ID. DE 246585501
>
>


Re: Savepoint/checkpoint confusion

2021-05-20 Thread Igor Basov
Hey Robert,
Thanks for the answer! But then I guess the only difference between
savepoints and checkpoints is that checkpoints are structurally state
dependent and can be incremental, but otherwise they are functionally
equivalent. So functionally savepoint can be considered a full checkpoint
which provides 2 additional benefits: it's made on-demand and the state
backend can be changed (since 1.13). Is this correct?

On Thu, 20 May 2021 at 05:35, Robert Metzger  wrote:

> Hey Igor,
>
> 1) yes, reactive mode indeed does the same.
> 2) No, HA mode is only storing some metadata in ZK about the leadership
> and latest checkpoints, but the checkpoints itself are the same. They
> should be usable for a changed job graph (if the state matches the
> operators by setting the UUIDs [1]
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/production_ready/#set-uuids-for-all-operators
>
>
> On Fri, May 7, 2021 at 10:13 PM Igor Basov  wrote:
>
>> Hello,
>> I got confused about usage of savepoints and checkpoints in different
>> scenarios.
>> I understand that checkpoints' main purpose is fault tolerance, they are
>> more lightweight and don't support changing job graph, parallelism or state
>> backend when restoring from them, as mentioned in the latest 1.13 docs:
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/checkpoints/#difference-to-savepoints
>>
>> At the same time:
>> 1) Reactive scaling mode (in 1.13) uses checkpoints exactly for that -
>> rescaling.
>> 2) There are use cases like here:
>>
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/What-happens-when-a-job-is-rescaled-td39462.html
>> where people seem to be using retained checkpoints instead of savepoints
>> to do manual job restarts with rescaling.
>> 3) There are claims like here:
>>
>> https://lists.apache.org/thread.html/4299518f4da2810aa88fe6b21f841880b619f3f8ac264084a318c034%40%3Cuser.flink.apache.org%3E
>> that in HA setup JobManager is able to restart from a checkpoint even if
>> operators are added/removed or parallelism is changed (in this case I'm not
>> sure if the checkpoints used by HA JM in `high-availability.storageDir` is
>> the same thing as usual checkpoints).
>>
>> So I guess the questions are:
>> 1) Can retained checkpoints be safely used for manual restarting and
>> rescaling a job?
>> 2) Are checkpoints made by HA JM structurally different from the usual
>> ones? Can they be used to restore a job with a changed job graph?
>>
>> Thank you,
>> Igor
>>
>>
>


Could watermark could be took into consideration after the channel become active from idle at once?

2021-05-20 Thread 张静
Hi community,
Now after a channel become active from idle, the watermark on this
channel would not be took into account when align watermarks util it
generates a watermark equals to or bigger than last emitted watermark.
It makes sense because it could prevent the newly active task resumed
from idle dragging down the entire job.

However, if the newly active task generate watermarks which are
smaller than but very closely to the last emitted one for a long time,
it could not be took into consideration in watermark alignment which
may lead to its data maybe dropped at the later window operator.

Is there any way to solve this problem? Or could we add a
configuration to define different watermark aligned behavior?

Any suggestion is appreciated. Thanks a lot.

Regards
JING


Issue with using siddhi extension function with flink

2021-05-20 Thread Dipanjan Mazumder
Hi ,

   i am trying to integrate siddhi with flink while trying to use siddhi 
extension function on deploying the job in flink cluster it is not able to find 
those libraries at run time , so i had to explicitly put those libraries to the 
/opt/flink/lib folder for the jobmanager and taskmanager , fat jar of the flink 
job application has those libraries but it cannot identify those extension 
functions at runtime and putting them to the lib folder is not a feasible 
choice. Can you give some pointer on this problem.. thanks in advance ..




I have tried multiple ways to load the classes using class.forname etc.. but 
nothing works even if the fat jar for the flink job application has the siddhi 
extensions in it.i don’t want to add those libraries to the jobmanage and 
taskmanagers lib folder everytime.




Any help will be appreciated.




Regards

Dipanjan


Re: Unable to deserialize Avro data using Pyflink

2021-05-20 Thread Zerah J
Hi Dian,

Thanks for your support.

I could deserialize the ConfluentAvro data using
ConfluentRegistryAvroDeserializationSchema, but since the GenericRecord
returned by  ConfluentRegistryAvroDeserializationSchema is not supported in
PyFlink currently, I am unable to proceed.

I can print the datastream using ds.print. Below is the result
3> {"name": "abc", "id": "123"}
3> {"name": "cde", "id": "456"}


Apart from this none of the transformations are not working.

Caused by: java.lang.UnsupportedOperationException: The type information:
GenericRecord("{"type":"record","name":"Employee_Details","namespace":"com.employee","doc":"Employee
Details Value
Schema.","fields":[{"name":"name","type":"string","doc":"String
Value"},{"name":"id","type":"string","doc":"String Value"}]}") is not
supported in PyFlink currently.


Is there a way to convert this Generic Records returned
by ConfluentRegistryAvroDeserializationSchema to into Flink rows like how
existing AvroRowDeserializationSchema is returning ?
Or please suggest any other ways by which I can perform transformations and
write the data to Kafka Topic

Regards,
Zerah

On Wed, May 19, 2021 at 7:13 PM Zerah J  wrote:

> Thanks Dian. It worked for me
>
> Regards,
> Zerah
>
> On Wed, May 19, 2021, 5:14 PM Dian Fu  wrote:
>
>> Hi Zerah,
>>
>> You could try to replace
>> ```
>> value_schema = avro.schema.parse()
>> ```
>>
>> with the following code:
>> ```
>> JSchemaParser = get_gateway().jvm.org.apache.avro.Schema.Parser
>> value_schema = JSchemaParser().parse(value_schema_str)
>> ```
>>
>> The reason is that ```value_schema = avro.schema.parse(> goes here>) ``` will create a Python object instead of Java object.
>>
>> Regards,
>> Dian
>>
>> 2021年5月19日 下午5:23,Zerah J  写道:
>>
>> Hi Dian,
>>
>> Type of value_schema is <*class 'avro.schema.RecordSchema*'>
>>
>> I have only a Json schema string and schema registry url. Please find
>> below snippet :
>>
>> import avro.schema
>>
>> value_schema_str = """
>> {
>>   "namespace": "com.nextgen.customer",
>>   "type": "record",
>>   "name": "employee",
>>   "doc": "Customer Details Value Schema.",
>>   "fields": [
>> {
>>   "doc": "String Value",
>>   "name": "emp_name",
>>   "type": "string"
>> },
>> {
>>   "doc": "String Value",
>>   "name": "emp_id",
>>   "type": "string"
>> }
>>   ]
>> }
>> value_schema = avro.schema.parse(value_schema_str)
>> schema_url = "http://host:port";
>>
>>
>> How can I create Java Schema object from this schema string and pass it
>> from python method ?
>>
>>
>> Regards,
>> Zerah
>>
>>
>> On Wed, May 19, 2021 at 1:57 PM Dian Fu  wrote:
>>
>>> Hi Zerah,
>>>
>>> What’s the type of value_schema? It should be a Java object of type
>>> Schema. From the exception, it seems that it’s a class instead of object.
>>> Is this true?
>>>
>>> Regards,
>>> Dian
>>>
>>> 2021年5月19日 下午3:41,Zerah J  写道:
>>>
>>> Hi Dian,
>>>
>>> Thanks for your suggestion.
>>>
>>> I tried to invoke  ConfluentRegistryAvroDeserializationSchema.forGeneric
>>> method from Python. But it's not working. Kindly check the code snippet
>>> below :
>>>
>>> class MyAvroRowDeserializationSchema(DeserializationSchema):
>>>
>>> def __init__(self, record_class: str = None, avro_schema_string:
>>> schema = None, url: str = None):
>>> JConfluentAvroRowDeserializationSchema = get_gateway().jvm \
>>>
>>> .org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema
>>> j_deserialization_schema =
>>> JConfluentAvroRowDeserializationSchema.forGeneric(avro_schema_string, url)
>>>
>>> super(MyAvroRowDeserializationSchema,
>>> self).__init__(j_deserialization_schema)
>>>
>>>
>>> FlinkKafkaConsumer is now invoked as below using
>>> MyAvroRowDeserializationSchema :
>>>
>>> value_schema = avro.schema.parse()
>>> schema_url = "http://host:port";
>>> deserialization_schema =
>>> MyAvroRowDeserializationSchema(avro_schema_string=value_schema,url=schema_url)
>>> kafka_source = FlinkKafkaConsumer(
>>> topics='my_topic',
>>> deserialization_schema=deserialization_schema,
>>> properties={'bootstrap.servers': 'host:port', 'group.id':
>>> 'test_group'})
>>>
>>> I'm getting the below error :
>>>
>>> Traceback (most recent call last):
>>>   File "flinkKafkaAvro.py", line 70, in datastream_api_demo
>>> deserialization_schema =
>>> MyAvroRowDeserializationSchema(avro_schema_string=value_schema,url=schema_url)
>>>   File "test_env/tests/SerializeAvroSchema.py", line 52, in __init__
>>> j_deserialization_schema =
>>> JConfluentAvroRowDeserializationSchema.forGeneric(avro_schema_string, url)
>>>   File
>>> "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py", line
>>> 1277, in __call__
>>> args_command, temp_args = self._build_args(*args)
>>>   File
>>> "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py", line
>>> 1247, in _build_args
>>>  

Kafka dynamic topic for Sink in SQL

2021-05-20 Thread Benoît Paris
Hi all!

I'm looking for a way to write to different Kafka topics based on some
column value in SQL.

I think it's possible with Java, using KafkaSerializationSchema,
and ProducerRecord(topic, ...), but I was wondering if I could somewhat
access that feature in SQL.

I'm also trying to evaluate the amount of work required so that I implement
it myself, subclassing the Kafka SQL connector just to add that feature.

Another alternative for me is to try to preprocess the SQL, detect Kafka
Sinks, force a DataStream conversion, then replace the Kafka SQL sink with
an equivalent DataStream that has the topic routing. (but this feels rather
brittle and maintenance-hard to me, rather than having the option in the
SQL sink configuration)

All comments/opinions/advice welcome!
Cheers
Ben


Re: Side outputs PyFlink

2021-05-20 Thread Dian Fu
Hi Wouter,

You are right that side out is still not supported in PyFlink. It’s definitely 
one of the features we want to support in the next release. 

For now, the workaround you mentioned is also what I have in my head. 
Personally I think if the performance of the filter is good enough, it will not 
affect the performance too much.

Regards,
Dian

> 2021年5月20日 下午5:15,Wouter Zorgdrager  写道:
> 
> Dear Flink community,
> 
> First of all, I'm very excited about the new 1.13 release. Among other 
> features, I'm particularly excited about the support of stateful operations 
> in Python. I think it will make the wonders of stream processing and the 
> power of Flink accessible to more developers. 
> 
> I'm currently playing around a bit with these new features and I was 
> wondering if there are already plans to support side output in the Python 
> API? This already works pretty neatly in the DataStream API but couldn't find 
> any communication on adding this to PyFlink. 
> 
> In the meantime, what do you suggest for a workaround on side outputs? 
> Intuitively, I would copy a stream and add a filter for each side output but 
> this seems a bit inefficient. In that setup, each side output will need to go 
> over the complete stream. Any ideas?
> 
> Thanks in advance!
> Regards,
> Wouter



Re: Issue reading from S3

2021-05-20 Thread Yun Gao
Hi Angelo,

I tried the fail case provied with a similar one:

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

EnvironmentSettings settings = 
EnvironmentSettings.newInstance().inBatchMode().useBlinkPlanner().build();

TableEnvironment t_env = TableEnvironment.create(settings);

t_env.getConfig().getConfiguration().setString("parallelism.default", "1");

t_env.executeSql("CREATE TABLE example (  `url` STRING) WITH ( 'connector' = 
'filesystem', 'path' = 's3a://whatnamedoyouwant/links', 'format' = 'raw')");

Table t1 = t_env.from("example");

t1.execute().print();

However, it seems the job could be executed successfully. 

I further tried with the configuration, and found that the exception
is thrown if there is no s3a.access-key or s3a.secret-key
configured. Could you have a look at if the two configuration items
are effective ?

Also I only configured the s3a.path-style: true, s3a.access-key and
s3a.secret-key, is it possible to remove the other configuration items
and have a try ? 

Best,
Yun




 --Original Mail --
Sender:Angelo G. 
Send Date:Wed May 19 00:24:42 2021
Recipients:Flink User Mail List 
Subject:Issue reading from S3

Hi,

I'm trying to read from and write to S3 with Flink 1.12.2. I'm submitting the 
job to local cluster (tar.gz distribution). I do not have a Hadoop installation 
running in the same machine. S3 (not Amazon) is running in a remote location 
and I have access to it via endpoint and access/secret keys.

The issue is that I'm able to read and write from and to S3 when using 
StreamExecutionEnvironment.readTextFile and DataStrean.writeAsText methods but 
I can't read from S3 when using the table API.

This is the application:

package org.apache.flink;import org.apache.flink.core.fs.FileSystem;import 
org.apache.flink.streaming.api.datastream.DataStream;import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import 
org.apache.flink.table.api.EnvironmentSettings;import 
org.apache.flink.table.api.Table;import 
org.apache.flink.table.api.TableEnvironment;public class ReadTables {public 
static void main(String[] args) throws Exception {// CLASSIC API (PLAIN 
TEXT)StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();DataStream 
ds = env.readTextFile("s3a://bucket/source.txt");
ds.writeAsText("s3a://bucket/dest.txt", FileSystem.WriteMode.OVERWRITE);
env.execute();// TABLE APIEnvironmentSettings settings = 
EnvironmentSettings.newInstance().inBatchMode().useBlinkPlanner().build();  
  TableEnvironment t_env = TableEnvironment.create(settings);
t_env.getConfig().getConfiguration().setString("parallelism.default", "1"); 
 t_env.executeSql("CREATE TABLE example (  `date` STRING,  `value` INT) 
WITH ( 'connector' = 'filesystem', 'path' = 's3a://bucket/xxx/yyy/', 'format' = 
'parquet')");Table t1 = t_env.from("example");
t1.execute().print();}}

The first job works properly, reading the source.txt file and writing it to 
dest.txt.

The second job does not work:

$~/flink-1.12.2$ ./bin/flink run -Dexecution.runtime-mode=BATCH -c 
org.apache.flink.ReadTables flink-s3-1.0-SNAPSHOT.jar;

Job has been submitted with JobID c690faed0051d1501d5b9747b56f
Program execution finished
Job with JobID c690faed0051d1501d5b9747b56f has finished.
Job Runtime: 17358 ms

Job has been submitted with JobID ebe54017faa83af33923d50892283e11
++-+
|   date |   value |
++-+


 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: Failed to fetch next result
Caused by: java.lang.RuntimeException: Failed to fetch next result
Caused by: java.io.IOException: Failed to fetch job execution result
Caused by: java.util.concurrent.ExecutionException: 
org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 
ebe54017faa83af33923d50892283e11)
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job 
failed (JobID: ebe54017faa83af33923d50892283e11)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution 
failed.
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
Caused by: java.lang.RuntimeException: One or more fetchers have encountered 
exception
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received 
unexpected exception while polling the records
Caused by: java.net.SocketTimeoutException: doesBucketExist on 
scib-des-cm-fipoac-medusa: com.amazonaws.AmazonClientException: No AWS 
Credentials provided by BasicAWSCredentialsProvider 
EnvironmentVariableCredentialsProvide

Re: Savepoint/checkpoint confusion

2021-05-20 Thread Robert Metzger
Hey Igor,

1) yes, reactive mode indeed does the same.
2) No, HA mode is only storing some metadata in ZK about the leadership and
latest checkpoints, but the checkpoints itself are the same. They should be
usable for a changed job graph (if the state matches the operators by
setting the UUIDs [1]

[1]
https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/production_ready/#set-uuids-for-all-operators


On Fri, May 7, 2021 at 10:13 PM Igor Basov  wrote:

> Hello,
> I got confused about usage of savepoints and checkpoints in different
> scenarios.
> I understand that checkpoints' main purpose is fault tolerance, they are
> more lightweight and don't support changing job graph, parallelism or state
> backend when restoring from them, as mentioned in the latest 1.13 docs:
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/checkpoints/#difference-to-savepoints
>
> At the same time:
> 1) Reactive scaling mode (in 1.13) uses checkpoints exactly for that -
> rescaling.
> 2) There are use cases like here:
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/What-happens-when-a-job-is-rescaled-td39462.html
> where people seem to be using retained checkpoints instead of savepoints
> to do manual job restarts with rescaling.
> 3) There are claims like here:
>
> https://lists.apache.org/thread.html/4299518f4da2810aa88fe6b21f841880b619f3f8ac264084a318c034%40%3Cuser.flink.apache.org%3E
> that in HA setup JobManager is able to restart from a checkpoint even if
> operators are added/removed or parallelism is changed (in this case I'm not
> sure if the checkpoints used by HA JM in `high-availability.storageDir` is
> the same thing as usual checkpoints).
>
> So I guess the questions are:
> 1) Can retained checkpoints be safely used for manual restarting and
> rescaling a job?
> 2) Are checkpoints made by HA JM structurally different from the usual
> ones? Can they be used to restore a job with a changed job graph?
>
> Thank you,
> Igor
>
>


Side outputs PyFlink

2021-05-20 Thread Wouter Zorgdrager
Dear Flink community,

First of all, I'm very excited about the new 1.13 release. Among
other features, I'm particularly excited about the support of stateful
operations in Python. I think it will make the wonders of stream processing
and the power of Flink accessible to more developers.

I'm currently playing around a bit with these new features and I was
wondering if there are already plans to support side output in the Python
API? This already works pretty neatly in the DataStream API but couldn't
find any communication on adding this to PyFlink.

In the meantime, what do you suggest for a workaround on side outputs?
Intuitively, I would copy a stream and add a filter for each side output
but this seems a bit inefficient. In that setup, each side output will need
to go over the complete stream. Any ideas?

Thanks in advance!
Regards,
Wouter


Re: Prometheus Reporter Enhancement

2021-05-20 Thread Chesnay Schepler
There is no plan to generally exclude label keys from the metric 
identifier/logical scope. They ensure that the label set for a given 
identifier/scope is unique, i.e., you can't have 2 metrics called 
"numRecordsIn" with different label sets. Changing this would also break 
all existing setups, so if anything if would have to be an opt-in feature.


What I envision more is for the user to have more control over the 
metric identifier/logical scope via the scope formats. They are 
currently quite limited by only controlling part of the final 
identifier, while the logical scope isn't controllable at all.


Generally though, there's a fair bit of internal re-structuring that 
we'd like to do before extending the metric system further, because 
we've been tacking on more and more things since it was released in 
1.3.0 (!!!) but barely refactored things to properly fit together.


On 5/20/2021 12:58 AM, Mason Chen wrote:
Are there any plans to rework some of the metric name formulations 
(getMetricIdentifier or getLogicalScope)? Currently, the label keys 
and/or label values are concatenated in the metric name and the 
information is redundant and makes the metric names longer.


Would it make sense to remove the tag related information 
(getAllVariables())?


On May 18, 2021, at 3:45 PM, Chesnay Schepler > wrote:


There is already a ticket for this. Note that this functionality 
should be implemented in a generic fashion to be usable for all 
reporters.


https://issues.apache.org/jira/browse/FLINK-17495

On 5/18/2021 8:16 PM, Andrew Otto wrote:

Sounds useful!

On Tue, May 18, 2021 at 2:02 PM Mason Chen > wrote:


Hi all,

Would people appreciate enhancements to the prometheus reporter
to include extra labels via a configuration, as a contribution
to Flink? I can see it being useful for adding labels that are
not job specific, but infra specific.

The change would be nicely integrated with the Flink’s
ConfigOptions and unit tested.

Best,
Mason









Re: Parallelism in Production: Best Practices

2021-05-20 Thread Jan Brusch



Hi Yaroslav,

here's a fourth option that we usually use: We set the default 
parallelism once when we initially deploy the app (maybe change it a few 
times in the beginning). From that point on rescale by either resizing 
the TaskManager-Nodes or redistributing the parallelism over more / less 
TaskManager-Nodes.


For example: We start to run an app with a default parallelism of 64 and 
we initially distribute this over 16 TaskManager-Nodes with 4 taskSlots 
each. Then we see that we have scaled way too high for the actual 
workload. We now have two options: Either reduce the hardware on the 16 
Nodes (CPU and RAM) or re-scale horizontally by re-distributing the 
workload over 8 TaskManager-Nodes with 8 taskSlots each.


Since we leave the parallelism of the Job untouched in each case, we can 
easily rescale by re-deploying the whole cluster and let it resume from 
the last checkpoint. A cleaner way would probably be to do this 
re-deployment with explicit savepoints.


We are doing this in kubernetes where both scaling options are really 
easy to carry out. But the same concepts should work on any other setup, 
too.



Hope that helps

Jan

On 19.05.21 20:00, Yaroslav Tkachenko wrote:

Hi everyone,

I'd love to learn more about how different companies approach 
specifying Flink parallelism. I'm specifically interested in real, 
production workloads.


I can see a few common patterns:

- Rely on default parallelism, scale by changing parallelism for the 
whole pipeline. I guess it only works if the pipeline doesn't have 
obvious bottlenecks. Also, it looks like the new reactive mode makes 
specifying parallelism for an operator obsolete 
(https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/elastic_scaling/#configuration 
)


- Rely on default parallelism for most of the operators, but override 
it for some. For example, it doesn't make sense for a Kafka source to 
have parallelism higher than the number of partitions it consumes. 
Some custom sinks could choose lower parallelism to avoid overloading 
their destinations. Some transformation steps could choose higher 
parallelism to distribute the work better, etc.


- Don't rely on default parallelism and configure parallelism 
explicitly for each operator. This requires very good knowledge of 
each operator in the pipeline, but it could lead to very good performance.


Is there a different pattern that I miss? What do you use? Feel free 
to share any resources.


If you do specify it explicitly, what do you think about the reactive 
mode? Will you use it?


Also, how often do you change parallelism? Do you set it once and 
forget once the pipeline is stable? Do you keep re-evaluating it?


Thanks.


--
neuland  – Büro für Informatik GmbH
Konsul-Smidt-Str. 8g, 28217 Bremen

Telefon (0421) 380107 57
Fax (0421) 380107 99
https://www.neuland-bfi.de

https://twitter.com/neuland
https://facebook.com/neulandbfi
https://xing.com/company/neulandbfi


Geschäftsführer: Thomas Gebauer, Jan Zander
Registergericht: Amtsgericht Bremen, HRB 23395 HB
USt-ID. DE 246585501