[Survey] Demand collection for stream SQL window join

2020-08-26 Thread Danny Chan
Hi, users, here i want to collect some use cases about the window join[1], 
which is a supported feature on the data stream. The purpose is to make a 
decision whether to support it also on the SQL side, for example, 2 tumbling 
window join may look like this:

```sql
select ... window_start, window_end
from TABLE(
  TUMBLE(
    DATA => TABLE table_a,
    TIMECOL => DESCRIPTOR(rowtime),
    SIZE => INTERVAL '1' MINUTE)) tumble_a
    [LEFT | RIGHT | FULL OUTER] JOIN TABLE(
  TUMBLE(
    DATA => TABLE table_b,
    TIMECOL => DESCRIPTOR(rowtime),
    SIZE => INTERVAL '1' MINUTE)) tumble_b
on tumble_a.col1 = tumble_b.col1 and ...
```

I had some discussion off-line with some companies (Tencent, Bytedance and 
Meituan), and it seems that interval join is the most common case. The window 
join case is very few, so i'm looking forward there are some feed-back here.

Expecially, it is apprecaited if you can share the use cases of the window join 
(using the Flink data stream or written by other programs) and why the 
window-join is a must(can not replace with normal stream join or interval join).

Thanks in advance ~

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/joining.html

Best,
Danny Chan


Re: flink interval join后按窗口聚组问题

2020-08-26 Thread Benchao Li
Hi Danny,

You are right, we have already considered the watermark lateness in this
case.
However our Interval Join Operator has some bug that will still produce
records later than watermark.
I've created a issue[1], we can discuss it in the jira issue.

[1] https://issues.apache.org/jira/browse/FLINK-18996

Danny Chan  于2020年8月26日周三 下午8:09写道:

> For SQL, we always hold back the watermark when we emit the elements, for
> time interval:
>
> return Math.max(leftRelativeSize, rightRelativeSize) + allowedLateness;
>
> For your case, the watermark would hold back for 1 hour, so the left join
> records would not delay when it is used by subsequent operators.
>
> See KeyedCoProcessOperatorWithWatermarkDelay and
> RowTimeIntervalJoin.getMaxOutputDelay for details.
>
> Best,
> Danny Chan
> 在 2020年7月3日 +0800 PM3:29,元始(Bob Hu) <657390...@qq.com>,写道:
>
> 您好,我想请教一个问题:
> flink双流表 interval join后再做window group是不是有问题呢,有些left join关联不上的数据会被丢掉。
> 比如关联条件是select * from a,b where a.id=b.id and b.rowtime between a.rowtime
> and a.rowtime + INTERVAL '1' HOUR
> ,看源码leftRelativeSize=1小时,rightRelativeSize=0,左流cleanUpTime = rowTime +
> leftRelativeSize + (leftRelativeSize + rightRelativeSize) / 2 +
> allowedLateness +
> 1,左表关联不上的数据会在1.5小时后输出(右表为null),而watermark的调整值是Math.max(leftRelativeSize,
> rightRelativeSize) +
> allowedLateness,也就是1小时,那这样等数据输出的时候watermark不是比左表rowtime还大0.5小时了吗,后面再有对连接流做group
> by的时候这种右表数据为空的数据就丢掉了啊。
> flink版本 1.10.0。
>
> 下面是我的一段测试代码:
>
> import org.apache.commons.net.ntp.TimeStamp;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.common.typeinfo.Types;
> import org.apache.flink.api.java.typeutils.RowTypeInfo;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import 
> org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
> import org.apache.flink.streaming.api.functions.ProcessFunction;
> import org.apache.flink.streaming.api.functions.source.SourceFunction;
> import org.apache.flink.streaming.api.watermark.Watermark;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.java.StreamTableEnvironment;
> import org.apache.flink.table.functions.ScalarFunction;
> import org.apache.flink.types.Row;
> import org.apache.flink.util.Collector;
> import org.apache.flink.util.IOUtils;
>
> import java.io.BufferedReader;
> import java.io.InputStreamReader;
> import java.io.Serializable;
> import java.net.InetSocketAddress;
> import java.net.Socket;
> import java.sql.Timestamp;
> import java.text.SimpleDateFormat;
> import java.util.ArrayList;
> import java.util.Date;
> import java.util.List;
>
> public class TimeBoundedJoin {
>
> public static AssignerWithPeriodicWatermarks getWatermark(Integer 
> maxIdleTime, long finalMaxOutOfOrderness) {
> AssignerWithPeriodicWatermarks timestampExtractor = new 
> AssignerWithPeriodicWatermarks() {
> private long currentMaxTimestamp = 0;
> private long lastMaxTimestamp = 0;
> private long lastUpdateTime = 0;
> boolean firstWatermark = true;
> //Integer maxIdleTime = 30;
>
> @Override
> public Watermark getCurrentWatermark() {
> if(firstWatermark) {
> lastUpdateTime = System.currentTimeMillis();
> firstWatermark = false;
> }
> if(currentMaxTimestamp != lastMaxTimestamp) {
> lastMaxTimestamp = currentMaxTimestamp;
> lastUpdateTime = System.currentTimeMillis();
> }
> if(maxIdleTime != null && System.currentTimeMillis() - 
> lastUpdateTime > maxIdleTime * 1000) {
> return new Watermark(new Date().getTime() - 
> finalMaxOutOfOrderness * 1000);
> }
> return new Watermark(currentMaxTimestamp - 
> finalMaxOutOfOrderness * 1000);
>
> }
>
> @Override
> public long extractTimestamp(Row row, long 
> previousElementTimestamp) {
> Object value = row.getField(1);
> long timestamp;
> try {
> timestamp = (long)value;
> } catch (Exception e) {
> timestamp = ((Timestamp)value).getTime();
> }
> if(timestamp > currentMaxTimestamp) {
> currentMaxTimestamp = timestamp;
> }
> return timestamp;
> }
> };
> return timestampExtractor;
> }
>
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment bsEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> EnvironmentSettings bsSettings = 
> EnvironmentSettings.newI

Re: Default Flink Metrics Graphite

2020-08-26 Thread Vijayendra Yadav
Hi Chesnay and Dawid,

I see multiple entries as following in Log:

2020-08-26 23:46:19,105 WARN
org.apache.flink.runtime.metrics.MetricRegistryImpl - Error while
registering metric: numRecordsIn.
java.lang.IllegalArgumentException: A metric named
ip-99--99-99.taskmanager.container_1596056409708_1570_01_06.vdcs-kafka-flink-test.Map.0.numRecordsIn
already exists
at com.codahale.metrics.MetricRegistry.register(MetricRegistry.java:91)
2020-08-26 23:46:19,094 WARN
org.apache.flink.runtime.metrics.MetricRegistryImpl - Error while
registering metric: numRecordsOut.
java.lang.IllegalArgumentException: A metric named
ip-99--99-999.taskmanager.container_1596056409708_1570_01_05.vdcs-kafka-flink-test.Map.2.numRecordsOut
already exists
at com.codahale.metrics.MetricRegistry.register(MetricRegistry.java:91)
at
org.apache.flink.dropwizard.ScheduledDropwizardReporter.notifyOfAddedMetric(ScheduledDropwizardReporter.java:131)
at com.codahale.metrics.MetricRegistry.register(MetricRegistry.java:91)
at
org.apache.flink.dropwizard.ScheduledDropwizardReporter.notifyOfAddedMetric(ScheduledDropwizardReporter.java:131)
at
org.apache.flink.runtime.metrics.MetricRegistryImpl.register(MetricRegistryImpl.java:343)
at
org.apache.flink.runtime.metrics.groups.AbstractMetricGroup.addMetric(AbstractMetricGroup.java:426)
at
org.apache.flink.runtime.metrics.groups.AbstractMetricGroup.counter(AbstractMetricGroup.java:359)
at
org.apache.flink.runtime.metrics.groups.AbstractMetricGroup.counter(AbstractMetricGroup.java:349)
at
org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup.(OperatorIOMetricGroup.java:41)
at
org.apache.flink.runtime.metrics.groups.OperatorMetricGroup.(OperatorMetricGroup.java:48)
at
org.apache.flink.runtime.metrics.groups.TaskMetricGroup.lambda$getOrAddOperator$0(TaskMetricGroup.java:154)
at java.util.HashMap.computeIfAbsent(HashMap.java:1127)
at
org.apache.flink.runtime.metrics.groups.TaskMetricGroup.getOrAddOperator(TaskMetricGroup.java:154)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.setup(AbstractStreamOperator.java:180)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.setup(AbstractUdfStreamOperator.java:82)
at
org.apache.flink.streaming.api.operators.SimpleOperatorFactory.createStreamOperator(SimpleOperatorFactory.java:75)
at
org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil.createOperator(StreamOperatorFactoryUtil.java:48)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:429)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:353)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:144)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:433)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.lang.Thread.run(Thread.java:748)
Regards,
Vijay


On Wed, Aug 26, 2020 at 7:53 AM Chesnay Schepler  wrote:

> metrics.reporter.grph.class:
> org.apache.flink.metrics.graphite.GraphiteReporter
>
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/metrics.html#graphite-orgapacheflinkmetricsgraphitegraphitereporter
>
> On 26/08/2020 16:40, Vijayendra Yadav wrote:
>
> Hi Dawid,
>
> I have 1.10.0 version of flink. What is alternative for this version ?
>
> Regards,
> Vijay
>
>
> On Aug 25, 2020, at 11:44 PM, Dawid Wysakowicz 
>  wrote:
>
> 
>
> Hi Vijay,
>
> I think the problem might be that you are using a wrong version of the
> reporter.
>
> You say you used flink-metrics-graphite-1.10.0.jar from 1.10 as a plugin,
> but it was migrated to plugins in 1.11 only[1].
>
> I'd recommend trying it out with the same 1.11 version of Flink and
> Graphite reporter.
>
> Best,
>
> Dawid
>
> [1] https://issues.apache.org/jira/browse/FLINK-16965
> On 26/08/2020 08:04, Vijayendra Yadav wrote:
>
> Hi Nikola,
>
> To rule out any other cluster issues, I have tried it in my local now.
> Steps as follows, but don't see any metrics yet.
>
> 1) Set up local Graphite
>
> docker run -d\
>  --name graphite\
>  --restart=always\
>  -p 80:80\
>  -p 2003-2004:2003-2004\
>  -p 2023-2024:2023-2024\
>  -p 8125:8125/udp\
>  -p 8126:8126\
>  graphiteapp/graphite-statsd
>
> Mapped Ports
> Host Container Service
> 80 80 nginx 
> 2003 2003 carbon receiver - plaintext
> 
> 2004 2004 carbon receiver - pickle
> 
> 2023 2023 carbon aggregator - plaintext
> 

Re: Failures due to inevitable high backpressure

2020-08-26 Thread David Anderson
One other thought: some users experiencing this have found it preferable to
increase the checkpoint timeout to the point where it is effectively
infinite. Checkpoints that can't timeout are likely to eventually complete,
which is better than landing in the vicious cycle you described.

David

On Wed, Aug 26, 2020 at 7:41 PM David Anderson 
wrote:

> You should begin by trying to identify the cause of the backpressure,
> because the appropriate fix depends on the details.
>
> Possible causes that I have seen include:
>
> - the job is inadequately provisioned
> - blocking i/o is being done in a user function
> - a huge number of timers are firing simultaneously
> - event time skew between different sources is causing large amounts of
> state to be buffered
> - data skew (a hot key) is overwhelming one subtask or slot
> - external systems can't keep up (e.g., a sink)
> - lengthy GC pauses caused by running lots of slots per TM with the
> FsStateBackend
> - contention for critical resources (e.g., using a NAS as the local disk
> for RocksDB)
>
> Unaligned checkpoints [1], new in Flink 1.11, should address this problem
> in some cases, depending on the root cause. But first you should try to
> figure out why you have high backpressure, because a number of the causes
> listed above won't be helped by changing to unaligned checkpoints.
>
> Best,
> David
>
> [1]
> https://flink.apache.org/news/2020/07/06/release-1.11.0.html#unaligned-checkpoints-beta
>
> On Wed, Aug 26, 2020 at 6:06 PM Hubert Chen 
> wrote:
>
>> Hello,
>>
>> My Flink application has entered into a bad state and I was wondering if
>> I could get some advice on how to resolve the issue.
>>
>> The sequence of events that led to a bad state:
>>
>> 1. A failure occurs (e.g., TM timeout) within the cluster
>> 2. The application successfully recovers from the last completed
>> checkpoint
>> 3. The application consumes events from Kafka as quickly as it can. This
>> introduces high backpressure.
>> 4. A checkpoint is triggered
>> 5. Another failure occurs (e.g., TM timeout, checkpoint timeout, Kafka
>> transaction timeout) and the application loops back to step #2. This
>> creates a vicious cycle where no progress is made.
>>
>> I believe the underlying issue is the application experiencing high
>> backpressure. This can cause the TM to not respond to heartbeats or cause
>> long checkpoint durations due to delayed processing of the checkpoint.
>>
>> I'm confused on the best next steps to take. How do I ensure that
>> heartbeats and checkpoints successfully complete when experiencing
>> inevitable high packpressure?
>>
>> Best,
>> Hubert
>>
>


Re: Failures due to inevitable high backpressure

2020-08-26 Thread David Anderson
You should begin by trying to identify the cause of the backpressure,
because the appropriate fix depends on the details.

Possible causes that I have seen include:

- the job is inadequately provisioned
- blocking i/o is being done in a user function
- a huge number of timers are firing simultaneously
- event time skew between different sources is causing large amounts of
state to be buffered
- data skew (a hot key) is overwhelming one subtask or slot
- external systems can't keep up (e.g., a sink)
- lengthy GC pauses caused by running lots of slots per TM with the
FsStateBackend
- contention for critical resources (e.g., using a NAS as the local disk
for RocksDB)

Unaligned checkpoints [1], new in Flink 1.11, should address this problem
in some cases, depending on the root cause. But first you should try to
figure out why you have high backpressure, because a number of the causes
listed above won't be helped by changing to unaligned checkpoints.

Best,
David

[1]
https://flink.apache.org/news/2020/07/06/release-1.11.0.html#unaligned-checkpoints-beta

On Wed, Aug 26, 2020 at 6:06 PM Hubert Chen  wrote:

> Hello,
>
> My Flink application has entered into a bad state and I was wondering if I
> could get some advice on how to resolve the issue.
>
> The sequence of events that led to a bad state:
>
> 1. A failure occurs (e.g., TM timeout) within the cluster
> 2. The application successfully recovers from the last completed checkpoint
> 3. The application consumes events from Kafka as quickly as it can. This
> introduces high backpressure.
> 4. A checkpoint is triggered
> 5. Another failure occurs (e.g., TM timeout, checkpoint timeout, Kafka
> transaction timeout) and the application loops back to step #2. This
> creates a vicious cycle where no progress is made.
>
> I believe the underlying issue is the application experiencing high
> backpressure. This can cause the TM to not respond to heartbeats or cause
> long checkpoint durations due to delayed processing of the checkpoint.
>
> I'm confused on the best next steps to take. How do I ensure that
> heartbeats and checkpoints successfully complete when experiencing
> inevitable high packpressure?
>
> Best,
> Hubert
>


Re: OOM error for heap state backend.

2020-08-26 Thread Vishwas Siravara
Thanks Andrey,
My question is related to

The FsStateBackend is encouraged for:

   - Jobs with large state, long windows, large key/value states.
   - All high-availability setups.

How large is large state without any overhead added by the framework?

Best,
Vishwas

On Wed, Aug 26, 2020 at 12:10 PM Andrey Zagrebin 
wrote:

> Hi Vishwas,
>
>  is this quantifiable with respect to JVM heap size on a single node
>> without the node being used for other tasks ?
>
>
> I don't quite understand this question. I believe the recommendation in
> docs has the same reason: use larger state objects so that the Java object
> overhead pays off.
> RocksDB keeps state in memory and on disk in the serialized form.
> Therefore it usually has a smaller footprint.
> Other jobs in the same task manager can potentially use other state
> backend depending on their state requirements.
> All tasks in the same task manager share the JVM heap as the task manager
> runs one JVM system process on the machine where it is deployed to.
>
> Best,
> Andrey
>
> On Wed, Aug 26, 2020 at 6:52 PM Vishwas Siravara 
> wrote:
>
>> Hi Andrey,
>> Thanks for getting back to me so quickly. The screenshots are for 1GB
>> heap, the keys for the state are 20 character strings(20 bytes, we don't
>> have multi byte characters) . So the overhead seems to be quite large(4x)
>> even in comparison to the checkpoint size(which already adds an overhead) .
>> In this document [1] it says use FS/Heap backend for large states, is this
>> quantifiable with respect to JVM heap size on a single node without the
>> node being used for other tasks ?
>> I have attached GC log for TM and JM
>>
>>
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#the-fsstatebackend
>>
>> Best,
>> Vishwas
>>
>> On Wed, Aug 26, 2020 at 11:29 AM Andrey Zagrebin 
>> wrote:
>>
>>> Hi Vishwas,
>>>
>>> I believe the screenshots are from a heap size of 1GB?
>>>
>>> There are indeed many internal Flink state objects. They are overhead
>>> which is required for Flink to organise and track the state on-heap.
>>> Depending on the actual size of your state objects, the overhead may be
>>> relatively large or compared to the actual state size.
>>> For example, if you just keep integers in your state then overhead is
>>> probably a couple of times larger.
>>> It is not easy to estimate exactly on-heap size without through analysis.
>>>
>>> The checkpoint has little overhead and includes only actual state data -
>>> your serialized state objects which are probably smaller than their heap
>>> representation.
>>>
>>> So my guess is that the heap representation of the state is much bigger
>>> compared to the checkpoint size.
>>>
>>> I also cc other people who might add more thoughts about on-heap state
>>> size.
>>>
>>> You could also provide GC logs as Xintong suggested.
>>>
>>> Best,
>>> Andrey
>>>
>>> On Wed, Aug 26, 2020 at 4:21 PM Vishwas Siravara 
>>> wrote:
>>>
 Hi Andrey and Xintong. 2.5 GB is from the flink web UI(
 checkpoint size). I took a heap dump and I could not find any memory leak
 from user code. I see the similar behaviour on smaller heap size, on a 1GB
 heap , the state size from checkpoint UI is 180 MB. Attaching some
 screenshots of heap profiles if it helps. So when the state grows GC takes
 a long time and sometimes the job manager removes TM slot because of
 1ms timeout and tries to restore the task in another task manager, this
 creates a cascading effect and affects other jobs running on the cluster.
 My tests were run in a single node cluster with 1 TM and 4 task slots with
 a parallelism of 4.

 Best,
 Vishwas

 On Tue, Aug 25, 2020 at 10:02 AM Andrey Zagrebin 
 wrote:

> Hi Vishwas,
>
> If you use Flink 1.7, check the older memory model docs [1] because
> you referred to the new memory model of Flink 1.10 in your reference 2.
> Could you also share a screenshot where you get the state size of 2.5
> GB? Do you mean Flink WebUI?
> Generally, it is quite hard to estimate the on-heap size of state java
> objects. I never heard about such a Flink metric.
>
> Best,
> Andrey
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/mem_setup.html
>
> On Mon, Aug 24, 2020 at 4:05 AM Xintong Song 
> wrote:
>
>> Hi Vishwas,
>>
>> According to the log, heap space is 13+GB, which looks fine.
>>
>> Several reason might lead to the heap space OOM:
>>
>>- Memory leak
>>- Not enough GC threads
>>- Concurrent GC starts too late
>>- ...
>>
>> I would suggest taking a look at the GC logs.
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Fri, Aug 21, 2020 at 10:34 PM Vishwas Siravara <
>> vsirav...@gmail.com> wrote:
>>
>>> Hi guys,
>>> I use flink version 1.7.2
>>> I have a stat

Re: OOM error for heap state backend.

2020-08-26 Thread Andrey Zagrebin
Hi Vishwas,

 is this quantifiable with respect to JVM heap size on a single node
> without the node being used for other tasks ?


I don't quite understand this question. I believe the recommendation in
docs has the same reason: use larger state objects so that the Java object
overhead pays off.
RocksDB keeps state in memory and on disk in the serialized form.
Therefore it usually has a smaller footprint.
Other jobs in the same task manager can potentially use other state backend
depending on their state requirements.
All tasks in the same task manager share the JVM heap as the task manager
runs one JVM system process on the machine where it is deployed to.

Best,
Andrey

On Wed, Aug 26, 2020 at 6:52 PM Vishwas Siravara 
wrote:

> Hi Andrey,
> Thanks for getting back to me so quickly. The screenshots are for 1GB
> heap, the keys for the state are 20 character strings(20 bytes, we don't
> have multi byte characters) . So the overhead seems to be quite large(4x)
> even in comparison to the checkpoint size(which already adds an overhead) .
> In this document [1] it says use FS/Heap backend for large states, is this
> quantifiable with respect to JVM heap size on a single node without the
> node being used for other tasks ?
> I have attached GC log for TM and JM
>
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#the-fsstatebackend
>
> Best,
> Vishwas
>
> On Wed, Aug 26, 2020 at 11:29 AM Andrey Zagrebin 
> wrote:
>
>> Hi Vishwas,
>>
>> I believe the screenshots are from a heap size of 1GB?
>>
>> There are indeed many internal Flink state objects. They are overhead
>> which is required for Flink to organise and track the state on-heap.
>> Depending on the actual size of your state objects, the overhead may be
>> relatively large or compared to the actual state size.
>> For example, if you just keep integers in your state then overhead is
>> probably a couple of times larger.
>> It is not easy to estimate exactly on-heap size without through analysis.
>>
>> The checkpoint has little overhead and includes only actual state data -
>> your serialized state objects which are probably smaller than their heap
>> representation.
>>
>> So my guess is that the heap representation of the state is much bigger
>> compared to the checkpoint size.
>>
>> I also cc other people who might add more thoughts about on-heap state
>> size.
>>
>> You could also provide GC logs as Xintong suggested.
>>
>> Best,
>> Andrey
>>
>> On Wed, Aug 26, 2020 at 4:21 PM Vishwas Siravara 
>> wrote:
>>
>>> Hi Andrey and Xintong. 2.5 GB is from the flink web UI(
>>> checkpoint size). I took a heap dump and I could not find any memory leak
>>> from user code. I see the similar behaviour on smaller heap size, on a 1GB
>>> heap , the state size from checkpoint UI is 180 MB. Attaching some
>>> screenshots of heap profiles if it helps. So when the state grows GC takes
>>> a long time and sometimes the job manager removes TM slot because of
>>> 1ms timeout and tries to restore the task in another task manager, this
>>> creates a cascading effect and affects other jobs running on the cluster.
>>> My tests were run in a single node cluster with 1 TM and 4 task slots with
>>> a parallelism of 4.
>>>
>>> Best,
>>> Vishwas
>>>
>>> On Tue, Aug 25, 2020 at 10:02 AM Andrey Zagrebin 
>>> wrote:
>>>
 Hi Vishwas,

 If you use Flink 1.7, check the older memory model docs [1] because you
 referred to the new memory model of Flink 1.10 in your reference 2.
 Could you also share a screenshot where you get the state size of 2.5
 GB? Do you mean Flink WebUI?
 Generally, it is quite hard to estimate the on-heap size of state java
 objects. I never heard about such a Flink metric.

 Best,
 Andrey

 [1]
 https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/mem_setup.html

 On Mon, Aug 24, 2020 at 4:05 AM Xintong Song 
 wrote:

> Hi Vishwas,
>
> According to the log, heap space is 13+GB, which looks fine.
>
> Several reason might lead to the heap space OOM:
>
>- Memory leak
>- Not enough GC threads
>- Concurrent GC starts too late
>- ...
>
> I would suggest taking a look at the GC logs.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Fri, Aug 21, 2020 at 10:34 PM Vishwas Siravara 
> wrote:
>
>> Hi guys,
>> I use flink version 1.7.2
>> I have a stateful streaming job which uses a keyed process function.
>> I use heap state backend. Although I set TM heap size to 16 GB, I get OOM
>> error when the state size is around 2.5 GB(from dashboard I get the state
>> size). I have set taskmanager.memory.fraction: 0.01 (which I believe is 
>> for
>> native calls off heap). [1] . For an 8 GB TM heap setting , the OOM 
>> errors
>> start showing up when the state size reaches 1 GB. This I find puzzling
>> because I wo

Re: Setting job/task manager memory management in kubernetes

2020-08-26 Thread Alexey Trenikhun
Hello,
What version of Flink do you use? If you use 1.10+ please check [1] (different 
properties names)

[1] - 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/memory/mem_setup.html

Thanks,
Alexey

From: Sakshi Bansal 
Sent: Monday, August 24, 2020 3:30 AM
To: user@flink.apache.org 
Subject: Setting job/task manager memory management in kubernetes

Hello,

I am trying to set the heap size of job and task manager when deploying the job 
in kubernetes. I have set the jobmanager.heap.size and taskmanager.heap.size. 
However, the custom values are not being used and it is creating its own values 
and starting the job. How can I set custom values?

--
Thanks and Regards
Sakshi Bansal


Re: Example flink run with security options? Running on k8s in my case

2020-08-26 Thread Andrey Zagrebin
Hi Adam,

maybe also check your SSL setup in a local cluster to exclude possibly
related k8s things.

Best,
Andrey

On Wed, Aug 26, 2020 at 3:59 PM Adam Roberts  wrote:

> Hey Nico - thanks for the prompt response, good catch - I've just tried
> with the two security options (enabling rest and internal SSL
> communications) and still hit the same problem
>
> I've also tried turning off security (both in my Job definition and in my
> Flink cluster JobManager/TaskManager settings) and the communication does
> happen successfully, suggesting all is well otherwise.
>
> With regards to testing with just a regular curl, I switched security back
> on and did the curl, using this:
>
>
> openssl pkcs12 -passin pass:OhQYGhmtYLxWhnMC -in
> /etc/flink-secrets/flink-tls-keystore.key -out rest.pem -nodes
>
> curl --cacert rest.pem tls-flink-cluster-1-11-jobmanager:8081
>
> curl --cacert rest.pem --cert rest.pem
> tls-flink-cluster-1-11-jobmanager:8081
>
> from the Job CR pod, which is who runs the flink run against my JobManager
> i'd like to connect to.
>
> That gives
>
>
> $ openssl pkcs12 -passin pass:OhQYGhmtYLxWhnMC -in
> /etc/flink-secrets/flink-tls-keystore.key -out rest.pem -nodes
>
> curl --cacert rest.pem tls-flink-cluster-1-11-jobmanager:8081
>
> curl --cacert rest.pem --cert rest.pem
> tls-flink-cluster-1-11-jobmanager:8081139676043637888:error:0D07207B:asn1
> encoding routines:ASN1_get_object:header too
> long:../crypto/asn1/asn1_lib.c:101:
> so I wonder if my security set up itself is flawed...I'll be happy to
> share the scripting I have to do that if folks feel it'll be of use, thanks
> again
>
>
> - Original message -
> From: Nico Kruber 
> To: user@flink.apache.org
> Cc: Adam Roberts 
> Subject: [EXTERNAL] Re: Example flink run with security options? Running
> on k8s in my case
> Date: Wed, Aug 26, 2020 11:40 AM
>
> Hi Adam,
> the flink binary will pick up any configuration from the flink-conf.yaml of
> its directory. If that is the same as in the cluster, you wouldn't have to
> pass most of your parameters manually. However, if you prefer not having a
> flink-conf.yaml in place, you could remove the security.ssl.internal.*
> parameter from its call since those only affect internal communication.
>
> If the client's connection to the JM is denied, you would actually have
> this
> in the JM logs as well which you could check.
>
> To check whether your whole setup works, I would suggest to try without
> security enabled first and then enable it (just to rule out any other
> issues)
>
> From the commands you mentioned, it looks like you're just missing
> security.ssl.rest.enabled=true and because of that, the client would not
> use
> SSL for the connection.
>
> For more information and setup, I recommend reading through [1] which also
> contains an example at the bottom of the page and how to use curl to test
> or
> use the REST endpoint.
>
>
> Nico
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/security-ssl.html
>
>
> On Tuesday, 25 August 2020 14:40:04 CEST Adam Roberts wrote:
> > Hey everyone, I've been experimenting with Flink
> > using https://github.com/GoogleCloudPlatform/flink-on-k8s-operator  and
> I
> > believe I've successfully deployed a JobManager and TaskManager with
> > security enabled, and a self-signed certificate (the pods come up great).
> > However, I can't do much with this - I can't port-forward and access the
> UI,
> > nor can I submit jobs to it by running another pod and using the DNS name
> > lookup of the service.
> > I always get
> >
> > The program finished with the following exception:
> > org.apache.flink.client.program.ProgramInvocationException: The main
> method
> > caused an error: java.util.concurrent.ExecutionException:
> > org.apache.flink.runtime.client.JobSubmissionException: Failed to submit
> > JobGraph.
> > ...
> >
> > Caused by: org.apache.flink.runtime.rest.ConnectionClosedException:
> Channel
> > became inactive. ... 37 more
> >
> >
> > and this is even with all of the -D security options provided.
> >
> > The versions of Flink are the same for both my Job and my FlinkCluster
> > (1.11.1).
> > Is this a sensible thing to do? If I weren't using the operator for
> example,
> > would users be expected to flink run with all of these options?
> > Does anything look odd here? My guess is because security's on, the Job
> > Manager refuses to talk to my submitter.
> > Running as the flink user in the container, I do
> >
> >
> >   securityContext:
> >
> > runAsUser: 
> >
> > runAsGroup: 
> >
> >   containers:
> >
> >   - name: wordcount
> >
> > image: adamroberts/mycoolflink:latest
> >
> > args:
> >
> > - /opt/flink/bin/flink
> >
> > - run
> >
> > - -D
> >
> > -
> > security.ssl.rest.keystore=/etc/flink-secrets/flink-tls-keystore.key
> >
> > - -D
> >
> > -
> >
> security.ssl.rest.truststore=/etc/flink-secrets/flink-tls-ca-truststore

Re: OOM error for heap state backend.

2020-08-26 Thread Andrey Zagrebin
Hi Vishwas,

I believe the screenshots are from a heap size of 1GB?

There are indeed many internal Flink state objects. They are overhead which
is required for Flink to organise and track the state on-heap.
Depending on the actual size of your state objects, the overhead may be
relatively large or compared to the actual state size.
For example, if you just keep integers in your state then overhead is
probably a couple of times larger.
It is not easy to estimate exactly on-heap size without through analysis.

The checkpoint has little overhead and includes only actual state data -
your serialized state objects which are probably smaller than their heap
representation.

So my guess is that the heap representation of the state is much bigger
compared to the checkpoint size.

I also cc other people who might add more thoughts about on-heap state size.

You could also provide GC logs as Xintong suggested.

Best,
Andrey

On Wed, Aug 26, 2020 at 4:21 PM Vishwas Siravara 
wrote:

> Hi Andrey and Xintong. 2.5 GB is from the flink web UI( checkpoint size).
> I took a heap dump and I could not find any memory leak from user code. I
> see the similar behaviour on smaller heap size, on a 1GB heap , the state
> size from checkpoint UI is 180 MB. Attaching some  screenshots of heap
> profiles if it helps. So when the state grows GC takes a long time and
> sometimes the job manager removes TM slot because of 1ms timeout and
> tries to restore the task in another task manager, this creates a cascading
> effect and affects other jobs running on the cluster. My tests were run in
> a single node cluster with 1 TM and 4 task slots with a parallelism of 4.
>
> Best,
> Vishwas
>
> On Tue, Aug 25, 2020 at 10:02 AM Andrey Zagrebin 
> wrote:
>
>> Hi Vishwas,
>>
>> If you use Flink 1.7, check the older memory model docs [1] because you
>> referred to the new memory model of Flink 1.10 in your reference 2.
>> Could you also share a screenshot where you get the state size of 2.5 GB?
>> Do you mean Flink WebUI?
>> Generally, it is quite hard to estimate the on-heap size of state java
>> objects. I never heard about such a Flink metric.
>>
>> Best,
>> Andrey
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/mem_setup.html
>>
>> On Mon, Aug 24, 2020 at 4:05 AM Xintong Song 
>> wrote:
>>
>>> Hi Vishwas,
>>>
>>> According to the log, heap space is 13+GB, which looks fine.
>>>
>>> Several reason might lead to the heap space OOM:
>>>
>>>- Memory leak
>>>- Not enough GC threads
>>>- Concurrent GC starts too late
>>>- ...
>>>
>>> I would suggest taking a look at the GC logs.
>>>
>>> Thank you~
>>>
>>> Xintong Song
>>>
>>>
>>>
>>> On Fri, Aug 21, 2020 at 10:34 PM Vishwas Siravara 
>>> wrote:
>>>
 Hi guys,
 I use flink version 1.7.2
 I have a stateful streaming job which uses a keyed process function. I
 use heap state backend. Although I set TM heap size to 16 GB, I get OOM
 error when the state size is around 2.5 GB(from dashboard I get the state
 size). I have set taskmanager.memory.fraction: 0.01 (which I believe is for
 native calls off heap). [1] . For an 8 GB TM heap setting , the OOM errors
 start showing up when the state size reaches 1 GB. This I find puzzling
 because I would expect to get a lot more space on the heap for state when I
 change the size to 16 GB, what fraction of the heap is used by the
 framework ?[2]. Below is the stack trace for the exception. How can I
 increase my state size on the heap ?

 2020-08-21 02:05:54,443 INFO
  org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - Memory
 usage stats: [HEAP: 11920/13653/13653 MB, NON HEAP: 130/154/-1 MB
 (used/committed/max)]
 2020-08-21 02:05:54,444 INFO
  org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - Direct
 memory stats: Count: 32796, Total Capacity: 1074692520, Used Memory:
 1074692521
 2020-08-21 02:05:54,444 INFO
  org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - Off-heap
 pool stats: [Code Cache: 51/55/240 MB (used/committed/max)], [Metaspace:
 70/88/-1 MB (used/committed/max)], [Compressed Class Space: 8/11/1024 MB
 (used/committed/max)]
 2020-08-21 02:05:54,444 INFO
  org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - Garbage
 collector stats: [PS Scavenge, GC TIME (ms): 481035, GC COUNT: 1770], [PS
 MarkSweep, GC TIME (ms): 8720945, GC COUNT: 265]
 2020-08-21 02:05:54,446 INFO  org.apache.flink.runtime.taskmanager.Task
 - KeyedProcess (1/4) (23946753549293edc23e88f257980cb4)
 switched from RUNNING to FAILED.
 java.lang.OutOfMemoryError: Java heap space
 at java.lang.reflect.Array.newInstance(Array.java:75)
 at java.util.Arrays.copyOf(Arrays.java:3212)
 at java.util.Arrays.copyOf(Arrays.java:3181)
 at
 org.apache.flink.runtime.state.heap.AbstractHeapPriorit

Failures due to inevitable high backpressure

2020-08-26 Thread Hubert Chen
Hello,

My Flink application has entered into a bad state and I was wondering if I
could get some advice on how to resolve the issue.

The sequence of events that led to a bad state:

1. A failure occurs (e.g., TM timeout) within the cluster
2. The application successfully recovers from the last completed checkpoint
3. The application consumes events from Kafka as quickly as it can. This
introduces high backpressure.
4. A checkpoint is triggered
5. Another failure occurs (e.g., TM timeout, checkpoint timeout, Kafka
transaction timeout) and the application loops back to step #2. This
creates a vicious cycle where no progress is made.

I believe the underlying issue is the application experiencing high
backpressure. This can cause the TM to not respond to heartbeats or cause
long checkpoint durations due to delayed processing of the checkpoint.

I'm confused on the best next steps to take. How do I ensure that
heartbeats and checkpoints successfully complete when experiencing
inevitable high packpressure?

Best,
Hubert


Re: Idle stream does not advance watermark in connected stream

2020-08-26 Thread Aljoscha Krettek
Yes, I'm afraid this analysis is correct. The StreamOperator, 
AbstractStreamOperator to be specific, computes the combined watermarks 
from both inputs here: 
https://github.com/apache/flink/blob/f0ed29c06d331892a06ee9bddea4173d6300835d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java#L573. 
The operator layer is not aware of idleness so it will never notice. The 
idleness only works on the level of inputs but is never forwarded to an 
operator itself.


To fix this we would have to also make operators aware of idleness such 
that they can take this into account when computing the combined output 
watermark.


Best,
Aljoscha

On 26.08.20 10:02, Dawid Wysakowicz wrote:

Hi Kien,

I am afraid this is a valid bug. I am not 100% sure but the way I
understand the code the idleness mechanism applies to input channels,
which means e.g. when multiple parallell instances shuffle its results
to downstream operators.

In case of a two input operator, combining the watermark of two
different upstream operators happens inside of the operator itself.
There we do not have the idleness status. We do not have a status that a
whole upstream operator became idle. That's definitely a bug/limitation.

I'm also cc'ing Aljoscha who could maybe confirm my analysis.

Best,

Dawid

On 24/08/2020 16:00, Truong Duc Kien wrote:

Hi all,
We are testing the new Idleness detection feature in Flink 1.11,
however, it does not work as we expected:
When we connect two data streams, of which one is idle, the output
watermark CoProcessOperator does not increase, hence the program
cannot progress.

I've made a small project to illustrate the problem. The watermark
received by the sink does not increase at all until the idle source is
stopped.

https://github.com/kien-truong/flink-idleness-testing

Is this a bug or does the idleness detection not support this use case ?

Regards.
Kien






Re: Default Flink Metrics Graphite

2020-08-26 Thread Chesnay Schepler
metrics.reporter.grph.class: 
org.apache.flink.metrics.graphite.GraphiteReporter


https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/metrics.html#graphite-orgapacheflinkmetricsgraphitegraphitereporter

On 26/08/2020 16:40, Vijayendra Yadav wrote:

Hi Dawid,

I have 1.10.0 version of flink. What is alternative for this version ?

Regards,
Vijay



On Aug 25, 2020, at 11:44 PM, Dawid Wysakowicz 
 wrote:




Hi Vijay,

I think the problem might be that you are using a wrong version of 
the reporter.


You say you used flink-metrics-graphite-1.10.0.jar from 1.10 as a 
plugin, but it was migrated to plugins in 1.11 only[1].


I'd recommend trying it out with the same 1.11 version of Flink and 
Graphite reporter.


Best,

Dawid

[1] https://issues.apache.org/jira/browse/FLINK-16965

On 26/08/2020 08:04, Vijayendra Yadav wrote:

Hi Nikola,

To rule out any other cluster issues, I have tried it in my local 
now. Steps as follows, but don't see any metrics yet.


1) Set up local Graphite

|docker run -d\ --name graphite\ --restart=always\ -p 80:80\ -p 
2003-2004:2003-2004\ -p 2023-2024:2023-2024\ -p 8125:8125/udp\ -p 
8126:8126\ graphiteapp/graphite-statsd|



  Mapped Ports

HostContainer   Service
80  80  nginx 
2003 	2003 	carbon receiver - plaintext 
 

2004 	2004 	carbon receiver - pickle 
 

2023 	2023 	carbon aggregator - plaintext 
 

2024 	2024 	carbon aggregator - pickle 
 


80808080Graphite internal gunicorn port (without Nginx proxying).
8125 	8125 	statsd 

8126 	8126 	statsd admin 



2) WebUI:





3) Run Flink example Job.
./bin/flink run 
./examples/flink-examples-streaming_2.11-1.11-SNAPSHOT-SocketWindowWordCount.jar 
--port 


with conf/flink-conf.yaml set as:

metrics.reporter.grph.factory.class: 
org.apache.flink.metrics.graphite.GraphiteReporterFactory

metrics.reporter.grph.host: localhost
metrics.reporter.grph.port: 2003
metrics.reporter.grph.protocol: TCP
metrics.reporter.grph.interval: 1 SECONDS

and graphite jar:

plugins/flink-metrics-graphite/flink-metrics-graphite-1.10.0.jar


4) Can't see any activity in webui graphite.


Could you review and let me know what is wrong here ? any other way 
you suggest to be able to view the raw metrics data ?
Also, do you have sample metrics raw format, you can share from any 
other project.


Regards,
Vijay




On Sun, Aug 23, 2020 at 9:26 PM Nikola Hrusov > wrote:


Hi Vijay,

Your steps look correct to me.
Perhaps you can double check that the graphite port you are
sending is correct? THe default carbon port is 2003 and if you
use the aggregator it is 2023.

You should be able to see in both flink jobmanager and
taskmanager that the metrics have been initialized with the
config you have pasted.

Regards
,
Nikola Hrusov


On Mon, Aug 24, 2020 at 5:00 AM Vijayendra Yadav
mailto:contact@gmail.com>> wrote:

Hi Team,

I am trying  to export Flink stream default metrics using
Graphite, but I can't find it in the Graphite metrics
console.  Could you confirm the steps below are correct?

*1) Updated flink-conf.yaml*

metrics.reporter.grph.factory.class:
org.apache.flink.metrics.graphite.GraphiteReporterFactory
metrics.reporter.grph.host: port
metrics.reporter.grph.port: 9109
metrics.reporter.grph.protocol: TCP
metrics.reporter.grph.interval: 30 SECONDS

2) Added Graphite jar in plugin folder :

ll */usr/lib/flink/plugins/metric/*
*flink-metrics-graphite-1.10.0.jar*

3) Looking metrics in graphite server:

http://port:8080/metrics 

Note: No code change is done.

Regards,
Vijay






Re: Default Flink Metrics Graphite

2020-08-26 Thread Dawid Wysakowicz
I'd recommend then following this instruction from older docs[1]

The difference are that you should set:

|metrics.reporter.grph.class:
org.apache.flink.metrics.graphite.GraphiteReporter|

and put the reporter jar to the /lib folder:

In order to use this reporter you must copy
|/opt/flink-metrics-graphite-1.10.0.jar| into the |/lib| folder of your
Flink distribution.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/metrics.html#graphite-orgapacheflinkmetricsgraphitegraphitereporter

Best,

Dawid

On 26/08/2020 16:40, Vijayendra Yadav wrote:
> Hi Dawid,
>
> I have 1.10.0 version of flink. What is alternative for this version ?
>
> Regards,
> Vijay
>
>>
>> On Aug 25, 2020, at 11:44 PM, Dawid Wysakowicz
>>  wrote:
>>
>> 
>>
>> Hi Vijay,
>>
>> I think the problem might be that you are using a wrong version of
>> the reporter.
>>
>> You say you used flink-metrics-graphite-1.10.0.jar from 1.10 as a
>> plugin, but it was migrated to plugins in 1.11 only[1].
>>
>> I'd recommend trying it out with the same 1.11 version of Flink and
>> Graphite reporter.
>>
>> Best,
>>
>> Dawid
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-16965
>>
>> On 26/08/2020 08:04, Vijayendra Yadav wrote:
>>> Hi Nikola,
>>>
>>> To rule out any other cluster issues, I have tried it in my local
>>> now. Steps as follows, but don't see any metrics yet.
>>>
>>> 1) Set up local Graphite 
>>>
>>> |docker run -d\ --name graphite\ --restart=always\ -p 80:80\ -p
>>> 2003-2004:2003-2004\ -p 2023-2024:2023-2024\ -p 8125:8125/udp\ -p
>>> 8126:8126\ graphiteapp/graphite-statsd|
>>>
>>>
>>>   Mapped Ports
>>>
>>> HostContainer   Service
>>> 80  80  nginx 
>>> 20032003carbon receiver - plaintext
>>> 
>>>
>>> 20042004carbon receiver - pickle
>>> 
>>>
>>> 20232023carbon aggregator - plaintext
>>> 
>>>
>>> 20242024carbon aggregator - pickle
>>> 
>>>
>>> 80808080Graphite internal gunicorn port (without Nginx 
>>> proxying).
>>> 81258125statsd
>>> 
>>> 81268126statsd admin
>>> 
>>>
>>> 2) WebUI: 
>>>
>>> 
>>>
>>>
>>>
>>> 3) Run Flink example Job.
>>> ./bin/flink run
>>> ./examples/flink-examples-streaming_2.11-1.11-SNAPSHOT-SocketWindowWordCount.jar
>>> --port 
>>>
>>> with conf/flink-conf.yaml set as:
>>>
>>> metrics.reporter.grph.factory.class:
>>> org.apache.flink.metrics.graphite.GraphiteReporterFactory
>>> metrics.reporter.grph.host: localhost
>>> metrics.reporter.grph.port: 2003
>>> metrics.reporter.grph.protocol: TCP
>>> metrics.reporter.grph.interval: 1 SECONDS
>>>
>>> and graphite jar:
>>>
>>> plugins/flink-metrics-graphite/flink-metrics-graphite-1.10.0.jar
>>>
>>>
>>> 4) Can't see any activity in webui graphite. 
>>>
>>>
>>> Could you review and let me know what is wrong here ? any other way
>>> you suggest to be able to view the raw metrics data ?
>>> Also, do you have sample metrics raw format, you can share from any
>>> other project.
>>>
>>> Regards,
>>> Vijay
>>>
>>>
>>>
>>>
>>> On Sun, Aug 23, 2020 at 9:26 PM Nikola Hrusov >> > wrote:
>>>
>>> Hi Vijay,
>>>
>>> Your steps look correct to me. 
>>> Perhaps you can double check that the graphite port you are
>>> sending is correct? THe default carbon port is 2003 and if you
>>> use the aggregator it is 2023.
>>>
>>> You should be able to see in both flink jobmanager and
>>> taskmanager that the metrics have been initialized with the
>>> config you have pasted.
>>>
>>> Regards
>>> ,
>>> Nikola Hrusov
>>>
>>>
>>> On Mon, Aug 24, 2020 at 5:00 AM Vijayendra Yadav
>>> mailto:contact@gmail.com>> wrote:
>>>
>>> Hi Team,
>>>
>>> I am trying  to export Flink stream default metrics using
>>> Graphite, but I can't find it in the Graphite metrics
>>> console.  Could you confirm the steps below are correct?
>>>
>>> *1) Updated flink-conf.yaml*
>>>
>>> metrics.reporter.grph.factory.class:
>>> org.apache.flink.metrics.graphite.GraphiteReporterFactory
>>> metrics.reporter.grph.host: port
>>> metrics.reporter.grph.port: 9109
>>> metrics.reporter.grph.protocol: TCP
>>> metrics.reporter.grph.interval: 30 SECONDS
>>>
>>> 2) Added Graphite jar in plugin folder :
>>>
>>> ll */usr/lib/flink/plugins/metric/*
>>>  *flink-metrics-graphite-1.10.0.jar*
>>>
>>> 3) Looking metrics in graphi

Re: Default Flink Metrics Graphite

2020-08-26 Thread Vijayendra Yadav
Hi Dawid,

I have 1.10.0 version of flink. What is alternative for this version ?

Regards,
Vijay

> 
> On Aug 25, 2020, at 11:44 PM, Dawid Wysakowicz  wrote:
> 
> 
> Hi Vijay,
> 
> I think the problem might be that you are using a wrong version of the 
> reporter.
> 
> You say you used flink-metrics-graphite-1.10.0.jar from 1.10 as a plugin, but 
> it was migrated to plugins in 1.11 only[1].
> 
> I'd recommend trying it out with the same 1.11 version of Flink and Graphite 
> reporter.
> 
> Best,
> 
> Dawid
> 
> [1] https://issues.apache.org/jira/browse/FLINK-16965
> 
> On 26/08/2020 08:04, Vijayendra Yadav wrote:
>> Hi Nikola,
>> 
>> To rule out any other cluster issues, I have tried it in my local now. Steps 
>> as follows, but don't see any metrics yet.
>> 
>> 1) Set up local Graphite 
>> 
>> docker run -d\
>>  --name graphite\
>>  --restart=always\
>>  -p 80:80\
>>  -p 2003-2004:2003-2004\
>>  -p 2023-2024:2023-2024\
>>  -p 8125:8125/udp\
>>  -p 8126:8126\
>>  graphiteapp/graphite-statsd
>> Mapped Ports
>> 
>> Host Container   Service
>> 80   80  nginx
>> 2003 2003carbon receiver - plaintext
>> 2004 2004carbon receiver - pickle
>> 2023 2023carbon aggregator - plaintext
>> 2024 2024carbon aggregator - pickle
>> 8080 8080Graphite internal gunicorn port (without Nginx proxying).
>> 8125 8125statsd
>> 8126 8126statsd admin
>> 2) WebUI: 
>> 
>> 
>> 
>> 
>> 
>> 3) Run Flink example Job.
>> ./bin/flink run 
>> ./examples/flink-examples-streaming_2.11-1.11-SNAPSHOT-SocketWindowWordCount.jar
>>  --port 
>> 
>> with conf/flink-conf.yaml set as:
>> 
>> metrics.reporter.grph.factory.class: 
>> org.apache.flink.metrics.graphite.GraphiteReporterFactory
>> metrics.reporter.grph.host: localhost
>> metrics.reporter.grph.port: 2003
>> metrics.reporter.grph.protocol: TCP
>> metrics.reporter.grph.interval: 1 SECONDS
>> 
>> and graphite jar:
>> 
>> plugins/flink-metrics-graphite/flink-metrics-graphite-1.10.0.jar
>> 
>> 
>> 4) Can't see any activity in webui graphite. 
>> 
>> 
>> Could you review and let me know what is wrong here ? any other way you 
>> suggest to be able to view the raw metrics data ?
>> Also, do you have sample metrics raw format, you can share from any other 
>> project.
>> 
>> Regards,
>> Vijay
>> 
>> 
>> 
>> 
>> On Sun, Aug 23, 2020 at 9:26 PM Nikola Hrusov  wrote:
>>> Hi Vijay,
>>> 
>>> Your steps look correct to me. 
>>> Perhaps you can double check that the graphite port you are sending is 
>>> correct? THe default carbon port is 2003 and if you use the aggregator it 
>>> is 2023.
>>> 
>>> You should be able to see in both flink jobmanager and taskmanager that the 
>>> metrics have been initialized with the config you have pasted.
>>> 
>>> Regards
>>> ,
>>> Nikola Hrusov
>>> 
>>> 
>>> On Mon, Aug 24, 2020 at 5:00 AM Vijayendra Yadav  
>>> wrote:
 Hi Team,
 
 I am trying  to export Flink stream default metrics using Graphite, but I 
 can't find it in the Graphite metrics console.  Could you confirm the 
 steps below are correct?
 
 1) Updated flink-conf.yaml
 
 metrics.reporter.grph.factory.class: 
 org.apache.flink.metrics.graphite.GraphiteReporterFactory
 metrics.reporter.grph.host: port
 metrics.reporter.grph.port: 9109
 metrics.reporter.grph.protocol: TCP
 metrics.reporter.grph.interval: 30 SECONDS
 
 2) Added Graphite jar in plugin folder :
 
 ll /usr/lib/flink/plugins/metric/
  flink-metrics-graphite-1.10.0.jar
 
 3) Looking metrics in graphite server:
 
 http://port:8080/metrics  
 
 Note: No code change is done.
 
 Regards,
 Vijay
 
 


Resource leak in DataSourceNode?

2020-08-26 Thread Mark Davis
Hi,

I am trying to investigate a problem with non-released resources in my 
application.

I have a stateful application which submits Flink DataSetjobs using code very 
similar to the code in CliFrontend.
I noticed what I am getting a lot of non-closed connections to my data store 
(HBase in my case). The connections are held by the application not the jobs 
themselves.

I am using HBaseRowDataInputFormat and it seems that HBase connections opened 
in the configure() method during the job graph creation(before the jobs is 
executed) are not closed. My search lead me to the method 
DataSourceNode.computeOperatorSpecificDefaultEstimates(DataStatistics) where I 
see that a format is not closed after being configured.

Is that correct? How can I overcome this issue?

My application is long running that is probably why I observe the resource 
leak. Would I spawn a new JVM to run jobs this problem would not be noticeable.

Thank you!

Cheers,
Marc

RE: Example flink run with security options? Running on k8s in my case

2020-08-26 Thread Adam Roberts
Hey Nico - thanks for the prompt response, good catch - I've just tried with the two security options (enabling rest and internal SSL communications) and still hit the same problem
 
I've also tried turning off security (both in my Job definition and in my Flink cluster JobManager/TaskManager settings) and the communication does happen successfully, suggesting all is well otherwise.
 
With regards to testing with just a regular curl, I switched security back on and did the curl, using this:
 
openssl pkcs12 -passin pass:OhQYGhmtYLxWhnMC -in /etc/flink-secrets/flink-tls-keystore.key -out rest.pem -nodes
curl --cacert rest.pem tls-flink-cluster-1-11-jobmanager:8081
curl --cacert rest.pem --cert rest.pem tls-flink-cluster-1-11-jobmanager:8081
 
from the Job CR pod, which is who runs the flink run against my JobManager i'd like to connect to.
 
That gives 
 
$ openssl pkcs12 -passin pass:OhQYGhmtYLxWhnMC -in /etc/flink-secrets/flink-tls-keystore.key -out rest.pem -nodes
curl --cacert rest.pem tls-flink-cluster-1-11-jobmanager:8081
curl --cacert rest.pem --cert rest.pem tls-flink-cluster-1-11-jobmanager:8081139676043637888:error:0D07207B:asn1 encoding routines:ASN1_get_object:header too long:../crypto/asn1/asn1_lib.c:101:
so I wonder if my security set up itself is flawed...I'll be happy to share the scripting I have to do that if folks feel it'll be of use, thanks again
 
- Original message -From: Nico Kruber To: user@flink.apache.orgCc: Adam Roberts Subject: [EXTERNAL] Re: Example flink run with security options? Running on k8s in my caseDate: Wed, Aug 26, 2020 11:40 AM 
Hi Adam,the flink binary will pick up any configuration from the flink-conf.yaml ofits directory. If that is the same as in the cluster, you wouldn't have topass most of your parameters manually. However, if you prefer not having aflink-conf.yaml in place, you could remove the security.ssl.internal.*parameter from its call since those only affect internal communication.If the client's connection to the JM is denied, you would actually have thisin the JM logs as well which you could check.To check whether your whole setup works, I would suggest to try withoutsecurity enabled first and then enable it (just to rule out any other issues)From the commands you mentioned, it looks like you're just missingsecurity.ssl.rest.enabled=true and because of that, the client would not useSSL for the connection.For more information and setup, I recommend reading through [1] which alsocontains an example at the bottom of the page and how to use curl to test oruse the REST endpoint.Nico[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/security-ssl.html On Tuesday, 25 August 2020 14:40:04 CEST Adam Roberts wrote:> Hey everyone, I've been experimenting with Flink> using https://github.com/GoogleCloudPlatform/flink-on-k8s-operator  and I> believe I've successfully deployed a JobManager and TaskManager with> security enabled, and a self-signed certificate (the pods come up great).> However, I can't do much with this - I can't port-forward and access the UI,> nor can I submit jobs to it by running another pod and using the DNS name> lookup of the service.> I always get>  > The program finished with the following exception:> org.apache.flink.client.program.ProgramInvocationException: The main method> caused an error: java.util.concurrent.ExecutionException:> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit> JobGraph.> ...>  > Caused by: org.apache.flink.runtime.rest.ConnectionClosedException: Channel> became inactive. ... 37 more>  >  > and this is even with all of the -D security options provided.>  > The versions of Flink are the same for both my Job and my FlinkCluster> (1.11.1).> Is this a sensible thing to do? If I weren't using the operator for example,> would users be expected to flink run with all of these options?> Does anything look odd here? My guess is because security's on, the Job> Manager refuses to talk to my submitter.> Running as the flink user in the container, I do>  >>       securityContext:>>         runAsUser: >>         runAsGroup: >>       containers:>>       - name: wordcount>>         image: adamroberts/mycoolflink:latest>>         args:>>         - /opt/flink/bin/flink>>         - run>>         - -D>>         -> security.ssl.rest.keystore=/etc/flink-secrets/flink-tls-keystore.key>>         - -D>>         -> security.ssl.rest.truststore=/etc/flink-secrets/flink-tls-ca-truststore.jks>>         - -D>>         - security.ssl.rest.keystore-password=thepass # Replace with value> of flink-tls-keystore.password>>         - -D>>         - security.ssl.rest.key-password=thepass # Replace with value of> tls.p12.password>>         - -D>>         - security.ssl.rest.truststore-password=thepass # Replace with value> of flink-tls-ca.truststore.password>>         - -D>>         -> security.ssl.internal.keystore=/etc/flink-secrets/flink-tls-keystore.key>>         - -D>>         -> security.ssl.inte

Re: How to visit outer service in batch for sql

2020-08-26 Thread Danny Chan
Hi, did you try to define a UDAF there within your group window sql, where you 
can have a custom service there.

I’m afraid you are right, SQL only supports time windows.

Best,
Danny Chan
在 2020年8月26日 +0800 PM8:02,刘建刚 ,写道:
>       For API, we can visit outer service in batch through countWindow, such 
> as the following. We can visit outer service every 1000 records. If we visit 
> outer service every record, it will be very slow for our job.
> source.keyBy(new KeySelector())
>.countWindow(1000)
>.apply((WindowFunction)
>(s, globalWindow, values, collector) -> {
>List resultList = service.visit(values);
>for (MyType result: resultList) {
>if (result.ok) {
>collector.collect(result);
>}
>}
>});
>       But how can I write SQL to implement the batch logic? I can use udf to 
> visit outer service. Currently, Flink only support time window but not count 
> window. I also check the udf wiki but find it hard to batch records.
>       Any suggestion is welcome. Thank you.
>
>
>
>
>
>


Re: flink interval join后按窗口聚组问题

2020-08-26 Thread Danny Chan
For SQL, we always hold back the watermark when we emit the elements, for time 
interval:

return Math.max(leftRelativeSize, rightRelativeSize) + allowedLateness;

For your case, the watermark would hold back for 1 hour, so the left join 
records would not delay when it is used by subsequent operators.

See KeyedCoProcessOperatorWithWatermarkDelay and 
RowTimeIntervalJoin.getMaxOutputDelay for details.

Best,
Danny Chan
在 2020年7月3日 +0800 PM3:29,元始(Bob Hu) <657390...@qq.com>,写道:
> 您好,我想请教一个问题:
> flink双流表 interval join后再做window group是不是有问题呢,有些left join关联不上的数据会被丢掉。
> 比如关联条件是select * from a,b where a.id=b.id and b.rowtime between a.rowtime and 
> a.rowtime + INTERVAL '1' HOUR 
> ,看源码leftRelativeSize=1小时,rightRelativeSize=0,左流cleanUpTime = rowTime + 
> leftRelativeSize + (leftRelativeSize + rightRelativeSize) / 2 + 
> allowedLateness + 
> 1,左表关联不上的数据会在1.5小时后输出(右表为null),而watermark的调整值是Math.max(leftRelativeSize, 
> rightRelativeSize) + 
> allowedLateness,也就是1小时,那这样等数据输出的时候watermark不是比左表rowtime还大0.5小时了吗,后面再有对连接流做group
>  by的时候这种右表数据为空的数据就丢掉了啊。
> flink版本 1.10.0。
>
> 下面是我的一段测试代码:
> import org.apache.commons.net.ntp.TimeStamp;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.common.typeinfo.Types;
> import org.apache.flink.api.java.typeutils.RowTypeInfo;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import 
> org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
> import org.apache.flink.streaming.api.functions.ProcessFunction;
> import org.apache.flink.streaming.api.functions.source.SourceFunction;
> import org.apache.flink.streaming.api.watermark.Watermark;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.java.StreamTableEnvironment;
> import org.apache.flink.table.functions.ScalarFunction;
> import org.apache.flink.types.Row;
> import org.apache.flink.util.Collector;
> import org.apache.flink.util.IOUtils;
>
> import java.io.BufferedReader;
> import java.io.InputStreamReader;
> import java.io.Serializable;
> import java.net.InetSocketAddress;
> import java.net.Socket;
> import java.sql.Timestamp;
> import java.text.SimpleDateFormat;
> import java.util.ArrayList;
> import java.util.Date;
> import java.util.List;
>
> public class TimeBoundedJoin {
>
>public static AssignerWithPeriodicWatermarks getWatermark(Integer 
> maxIdleTime, long finalMaxOutOfOrderness) {
>AssignerWithPeriodicWatermarks timestampExtractor = new 
> AssignerWithPeriodicWatermarks() {
>private long currentMaxTimestamp = 0;
>private long lastMaxTimestamp = 0;
>private long lastUpdateTime = 0;
>boolean firstWatermark = true;
> //Integer maxIdleTime = 30;
>
>@Override
>public Watermark getCurrentWatermark() {
>if(firstWatermark) {
>lastUpdateTime = System.currentTimeMillis();
>firstWatermark = false;
>}
>if(currentMaxTimestamp != lastMaxTimestamp) {
>lastMaxTimestamp = currentMaxTimestamp;
>lastUpdateTime = System.currentTimeMillis();
>}
>if(maxIdleTime != null && System.currentTimeMillis() - 
> lastUpdateTime > maxIdleTime * 1000) {
>return new Watermark(new Date().getTime() - 
> finalMaxOutOfOrderness * 1000);
>}
>return new Watermark(currentMaxTimestamp - 
> finalMaxOutOfOrderness * 1000);
>
>}
>
>@Override
>public long extractTimestamp(Row row, long 
> previousElementTimestamp) {
>Object value = row.getField(1);
>long timestamp;
>try {
>timestamp = (long)value;
>} catch (Exception e) {
>timestamp = ((Timestamp)value).getTime();
>}
>if(timestamp > currentMaxTimestamp) {
>currentMaxTimestamp = timestamp;
>}
>return timestamp;
>}
>};
>return timestampExtractor;
>}
>
>public static void main(String[] args) throws Exception {
>StreamExecutionEnvironment bsEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>EnvironmentSettings bsSettings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>StreamTableEnvironment bsTableEnv = 
> StreamTableEnvironment.create(bsEnv, bsSettings);
>bsEnv.setParallelism(1);
>bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
>
> //DataStream ds1 = bsEnv.addSource(sourceFunction(9000));
>SimpleDateFormat sdf = new SimpleDateFormat("-MM-dd HH:mm:s

How to visit outer service in batch for sql

2020-08-26 Thread 刘建刚
  For API, we can visit outer service in batch through countWindow,
such as the following. We can visit outer service every 1000 records. If we
visit outer service every record, it will be very slow for our job.

source.keyBy(new KeySelector())
.countWindow(1000)
.apply((WindowFunction)
(s, globalWindow, values, collector) -> {
List resultList = service.visit(values);
for (MyType result: resultList) {
if (result.ok) {
collector.collect(result);
}
}
});

  But how can I write SQL to implement the batch logic? I can use udf
to visit outer service. Currently, Flink only support time window but not
count window. I also check the udf wiki but find it hard to batch records.
  Any suggestion is welcome. Thank you.


Re: [ANNOUNCE] Apache Flink 1.10.2 released

2020-08-26 Thread Leonard Xu
Thanks ZhuZhu for being the release manager and everyone who contributed to 
this release.

Best,
Leonard



Re: [ANNOUNCE] Apache Flink 1.10.2 released

2020-08-26 Thread Congxian Qiu
Thanks ZhuZhu for managing this release and everyone else who contributed
to this release!

Best,
Congxian


Xingbo Huang  于2020年8月26日周三 下午1:53写道:

> Thanks Zhu for the great work and everyone who contributed to this release!
>
> Best,
> Xingbo
>
> Guowei Ma  于2020年8月26日周三 下午12:43写道:
>
>> Hi,
>>
>> Thanks a lot for being the release manager Zhu Zhu!
>> Thanks everyone contributed to this!
>>
>> Best,
>> Guowei
>>
>>
>> On Wed, Aug 26, 2020 at 11:18 AM Yun Tang  wrote:
>>
>>> Thanks for Zhu's work to manage this release and everyone who
>>> contributed to this!
>>>
>>> Best,
>>> Yun Tang
>>> 
>>> From: Yangze Guo 
>>> Sent: Tuesday, August 25, 2020 14:47
>>> To: Dian Fu 
>>> Cc: Zhu Zhu ; dev ; user <
>>> user@flink.apache.org>; user-zh 
>>> Subject: Re: [ANNOUNCE] Apache Flink 1.10.2 released
>>>
>>> Thanks a lot for being the release manager Zhu Zhu!
>>> Congrats to all others who have contributed to the release!
>>>
>>> Best,
>>> Yangze Guo
>>>
>>> On Tue, Aug 25, 2020 at 2:42 PM Dian Fu  wrote:
>>> >
>>> > Thanks ZhuZhu for managing this release and everyone else who
>>> contributed to this release!
>>> >
>>> > Regards,
>>> > Dian
>>> >
>>> > 在 2020年8月25日,下午2:22,Till Rohrmann  写道:
>>> >
>>> > Great news. Thanks a lot for being our release manager Zhu Zhu and to
>>> all others who have contributed to the release!
>>> >
>>> > Cheers,
>>> > Till
>>> >
>>> > On Tue, Aug 25, 2020 at 5:37 AM Zhu Zhu  wrote:
>>> >>
>>> >> The Apache Flink community is very happy to announce the release of
>>> Apache Flink 1.10.2, which is the first bugfix release for the Apache Flink
>>> 1.10 series.
>>> >>
>>> >> Apache Flink® is an open-source stream processing framework for
>>> distributed, high-performing, always-available, and accurate data streaming
>>> applications.
>>> >>
>>> >> The release is available for download at:
>>> >> https://flink.apache.org/downloads.html
>>> >>
>>> >> Please check out the release blog post for an overview of the
>>> improvements for this bugfix release:
>>> >> https://flink.apache.org/news/2020/08/25/release-1.10.2.html
>>> >>
>>> >> The full release notes are available in Jira:
>>> >>
>>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12347791
>>> >>
>>> >> We would like to thank all contributors of the Apache Flink community
>>> who made this release possible!
>>> >>
>>> >> Thanks,
>>> >> Zhu
>>> >
>>> >
>>>
>>


Re: Monitor the usage of keyed state

2020-08-26 Thread Yun Tang
Hi Mu

I want to share something more about the memory usage of RocksDB.

If you enable managed memory for rocksDB (which is enabled by default) [1], you 
should refer to the block cache usage as we cast index & filter into cache and 
counting write buffer usage in cache.
We could refer to the usage of block cache [2] to know the overall memory usage 
of RocksDB.
BTW, since we use the same cache for rocksDB instances within one slot, you 
might notice that all rocksDBs in the same slot would report the same block 
cache usage, please not sum them up.

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#state-backend-rocksdb-memory-managed
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#state-backend-rocksdb-metrics-block-cache-usage

Best,
Yun Tang

From: Andrey Zagrebin 
Sent: Tuesday, August 25, 2020 22:12
To: Mu Kong 
Cc: flink-u...@apache.org 
Subject: Re: Monitor the usage of keyed state

Hi Mu,

I would suggest to look into RocksDB metrics which you can enable as Flink 
metrics [1]

Best,
Andrey

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#rocksdb-native-metrics

On Fri, Aug 21, 2020 at 4:27 AM Mu Kong 
mailto:kong.mu@gmail.com>> wrote:
Hi community,

I have a Flink job running with RichMapFunction that uses keyed state.
Although the TTL is enabled, I wonder if there is a way that I can monitor the 
memory usage of the keyed state. I'm using RocksDB as the state backend.

Best regards,
Mu


Re: Example flink run with security options? Running on k8s in my case

2020-08-26 Thread Nico Kruber
Hi Adam,
the flink binary will pick up any configuration from the flink-conf.yaml of 
its directory. If that is the same as in the cluster, you wouldn't have to 
pass most of your parameters manually. However, if you prefer not having a 
flink-conf.yaml in place, you could remove the security.ssl.internal.* 
parameter from its call since those only affect internal communication.

If the client's connection to the JM is denied, you would actually have this 
in the JM logs as well which you could check.

To check whether your whole setup works, I would suggest to try without 
security enabled first and then enable it (just to rule out any other issues)

>From the commands you mentioned, it looks like you're just missing 
security.ssl.rest.enabled=true and because of that, the client would not use 
SSL for the connection.

For more information and setup, I recommend reading through [1] which also 
contains an example at the bottom of the page and how to use curl to test or 
use the REST endpoint.


Nico


[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/security-ssl.html

On Tuesday, 25 August 2020 14:40:04 CEST Adam Roberts wrote:
> Hey everyone, I've been experimenting with Flink
> using https://github.com/GoogleCloudPlatform/flink-on-k8s-operator and I
> believe I've successfully deployed a JobManager and TaskManager with
> security enabled, and a self-signed certificate (the pods come up great). 
> However, I can't do much with this - I can't port-forward and access the UI,
> nor can I submit jobs to it by running another pod and using the DNS name
> lookup of the service. 
> I always get
>  
> The program finished with the following exception:
> org.apache.flink.client.program.ProgramInvocationException: The main method
> caused an error: java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit
> JobGraph. 
> ...
>  
> Caused by: org.apache.flink.runtime.rest.ConnectionClosedException: Channel
> became inactive. ... 37 more
>  
>  
> and this is even with all of the -D security options provided.
>  
> The versions of Flink are the same for both my Job and my FlinkCluster
> (1.11.1). 
> Is this a sensible thing to do? If I weren't using the operator for example,
> would users be expected to flink run with all of these options? 
> Does anything look odd here? My guess is because security's on, the Job
> Manager refuses to talk to my submitter. 
> Running as the flink user in the container, I do
>  
> 
>   securityContext:
> 
> runAsUser: 
> 
> runAsGroup: 
> 
>   containers:
> 
>   - name: wordcount
> 
> image: adamroberts/mycoolflink:latest
> 
> args:
> 
> - /opt/flink/bin/flink
> 
> - run
> 
> - -D
> 
> -
> security.ssl.rest.keystore=/etc/flink-secrets/flink-tls-keystore.key
> 
> - -D 
> 
> -
> security.ssl.rest.truststore=/etc/flink-secrets/flink-tls-ca-truststore.jks
> 
> - -D 
> 
> - security.ssl.rest.keystore-password=thepass # Replace with value
> of flink-tls-keystore.password
> 
> - -D 
> 
> - security.ssl.rest.key-password=thepass # Replace with value of
> tls.p12.password
> 
> - -D 
> 
> - security.ssl.rest.truststore-password=thepass # Replace with value
> of flink-tls-ca.truststore.password
> 
> - -D 
> 
> -
> security.ssl.internal.keystore=/etc/flink-secrets/flink-tls-keystore.key
> 
> - -D 
> 
> -
> security.ssl.internal.truststore=/etc/flink-secrets/flink-tls-ca-truststore
> .jks
> 
> - -D 
> 
> - security.ssl.internal.keystore-password=thepass # Replace with
> value of flink-tls-keystore.password
> 
> - -D 
> 
> - security.ssl.internal.key-password=thepass # Replace with value of
> flink-tls-keystore.password
> 
> - -D 
> 
> - security.ssl.internal.truststore-password=thepass # Replace with
> value of flink-tls-truststore.password
> 
> - -m
> 
> - tls-flink-cluster-1-11-jobmanager:8081
> 
> - /opt/flink/examples/batch/WordCount.jar 
> 
> - --input 
> 
> - /opt/flink/NOTICE
> 
>  
> with the secrets mounted in at the above location (if I exec into my
> container, I can see they're all there OK). Note that it is a read-only
> file system. 
> adamroberts/mycoolflink (at this time of this email) is just based
> on https://github.com/apache/flink-docker. 
> Thanks!
>  
> Unless stated otherwise above:
> IBM United Kingdom Limited - Registered in England and Wales with number
> 741598. Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire
> PO6 3AU





Re: Idle stream does not advance watermark in connected stream

2020-08-26 Thread Dawid Wysakowicz
Hi Kien,

I am afraid this is a valid bug. I am not 100% sure but the way I
understand the code the idleness mechanism applies to input channels,
which means e.g. when multiple parallell instances shuffle its results
to downstream operators.

In case of a two input operator, combining the watermark of two
different upstream operators happens inside of the operator itself.
There we do not have the idleness status. We do not have a status that a
whole upstream operator became idle. That's definitely a bug/limitation.

I'm also cc'ing Aljoscha who could maybe confirm my analysis.

Best,

Dawid

On 24/08/2020 16:00, Truong Duc Kien wrote:
> Hi all,
> We are testing the new Idleness detection feature in Flink 1.11,
> however, it does not work as we expected:
> When we connect two data streams, of which one is idle, the output
> watermark CoProcessOperator does not increase, hence the program
> cannot progress.
>
> I've made a small project to illustrate the problem. The watermark
> received by the sink does not increase at all until the idle source is
> stopped.
>
> https://github.com/kien-truong/flink-idleness-testing
>
> Is this a bug or does the idleness detection not support this use case ?
>
> Regards.
> Kien



signature.asc
Description: OpenPGP digital signature


Re: Loading FlinkKafkaProducer fails with LinkError

2020-08-26 Thread Arvid Heise
Hi,

@Chesnay Schepler  The issue is that the uber-jar is
first loaded with Flink's app classloader (because it's in lib) and then
when the application starts, it gets loaded again in the ChildFirstCL and
since it's child-first, the class is loaded anyways.

What I don't quite understand is why the Kafka class was loaded through the
app classloader to begin with. Since Yuval mentioned that it happens only
during restoration, I'm suspecting that Flink is not using the correct
classloader at some point. Unfortunately, I don't know an easy way to trace
the loading for the first loading (we have the stack trace for the second
loading but I think it's legit).

On Tue, Aug 25, 2020 at 11:24 PM Yuval Itzchakov  wrote:

> They are definitely equal, the same JAR is copied in subsequent lines in
> the Dockerfile.
>
> Regarding the NoSuchMethodException, I'll look it up and let you know
> tomorrow.
>
> On Tue, Aug 25, 2020, 22:59 Chesnay Schepler  wrote:
>
>> The simplest answer is that they are in fact not equal; maybe it is a jar
>> of an older version of your setup?
>>
>> Can you give some details on the NoSuchMethodException? Specifically
>> whether it tries to access something from the Kafka connector, or from your
>> own user code.
>>
>> On 25/08/2020 21:27, Yuval Itzchakov wrote:
>>
>> OK, I think I figured it out. It looks like the uber-jar is also being
>> placed under `lib`, which is probably the cause of the problem.
>>
>> Question is, why does it identify it as two different versions? It's
>> exactly the same JAR.
>>
>> On Tue, Aug 25, 2020 at 10:22 PM Yuval Itzchakov 
>> wrote:
>>
>>> I'm afraid it's not being printed out due to different log levels :(
>>>
>>> Yes, I build the image myself. It takes the tar file from
>>> https://archive.apache.org/dist/flink/flink-1.9.0/
>>>  and unpacks it
>>> into the image.
>>> I've ran:
>>>
>>> find . -iname "*.jar" | xargs -n 1 jar tf | grep -i producerrecord
>>> find . -iname "*.jar" | xargs -n 1 jar tf | grep -i kafka
>>>
>>> Both from within /lib, they both produce no results.
>>>
>>> On Tue, Aug 25, 2020 at 10:07 PM Chesnay Schepler 
>>> wrote:
>>>
 The NoSuchMethodException shows that the class is still on the
 classpath, but with a different version than your code is expecting.
 Otherwise you would've gotten a different error.
 This implies that there are 2 versions of the kafka dependencies on the
 classpath in your original run; it suddenly working with parent-first
 classloading reinforces the suspicion that they are present in the
 distribution.

 As Arvid mentioned, the classpath log entry (at the very start of the
 log file) would be interesting.

 Did you build the Flink yourself distribution, or are you relying on
 one of the existing Flink binaries/images?

 On 25/08/2020 20:51, Yuval Itzchakov wrote:

 Hi Arvid,
 I'm running Flink in a job cluster on k8s using the Lyft Operator.

 The flink image that I'm building does not have the
 flink-connector-kafka library in it's JAR, I've made sure of this using
 `jar -tf`. Additionally, once I removed the dependency from my uber jar, it
 failed with a "NoSuchMethodException" at runtime for one of the arbitrary
 methods.

 I used classloader.resolve-order: parent-first and it resolved the
 issue somehow. I still don't know why though.

 On Tue, Aug 25, 2020 at 6:13 PM Arvid Heise 
 wrote:

> Hi Yuval,
>
> How do you execute Flink? Can you show us the log entry with the
> classpath?
>
> I'm guessing that you have Kafka bundled in your uber-jar and
> additionally also have the connector in flink-dist/lib. If so, you simply
> need to remove it in one place. In general, if you use flink-dist, you'd
> not bundle any Flink dependencies in your uber-jar (use provided scope for
> them).
>
> If you have everything bundled in one uber-jar and execute it somehow
> without flink-dist, then I don't immediately see a solution. Then the log
> with the classpath would help.
>
> Best,
>
> Arvid
>
>
> On Sun, Aug 23, 2020 at 1:37 PM Yuval Itzchakov 
> wrote:
>
>> Hi,
>> I'm trying to load a FlinkKafkaProducer sink alongside another custom
>> sink. While trying to restore
>> a running Flink app from the previous state, I get the error message
>> below.
>> I am running Flink 1.9.0 with the following SBT dependency added:
>> "org.apache.flink" %% "flink-connector-kafka" % 1.9.0
>> And the app is deployed via a standard uber jar with all the
>> dependencies. W
>> Would appreciate the help
>> java.lang.LinkageError: loader constraint violation: loader
>> (instance of org/apache/flink/util/ChildFirstClassLoader) previously
>> initiated loading for a different type with name
>> "org/apache/kafka/clients/producer/Pro

Re: Why consecutive calls of orderBy are forbidden?

2020-08-26 Thread Dawid Wysakowicz
Hi,

I think you are hitting a bug here. It should be possible what you are
trying to do. Would you like to open a bug for it?

However, the bug applies to the legacy batch planner (you are using the
BatchTableEnvironment), which is no longer maintained and there were
discussions already to drop it in one of the upcoming releases (maybe
even 1.12). I'd suggest upgrading to the Blink planner.

Moreover as a workaround you could apply the second orderBy on a
non-sorted table. There is no point ub applying two different orderings
as the second one will just override the order from the first sorting.

Best,

Dawid

On 24/08/2020 13:26, 洪帆(既起) wrote:
> Hi, all.
> I tried calling two consecutive orderBy for a Table, but got an exception.
> Can anyone explain why this happens? 
> In my mind, orderBy should be able to be called by any Tables. But
> obviously, it is not with no explanation.
>
> Here is a simplified version of code:
>
> Table table = btenv.scan("source").orderBy("cola");
> table.insertInto("sink");
> Table table2 = table.orderBy("colb");
> table2.insertInto("sink2");
> btenv.execute("testest");
>
> The exception is as follows:
>
> java.lang.NullPointerException
>  at 
> org.apache.flink.optimizer.dag.SingleInputNode.computeInterestingPropertiesForInputs(SingleInputNode.java:224)
>  at 
> org.apache.flink.optimizer.traversals.InterestingPropertyVisitor.preVisit(InterestingPropertyVisitor.java:51)
>  at 
> org.apache.flink.optimizer.traversals.InterestingPropertyVisitor.preVisit(InterestingPropertyVisitor.java:29)
>  at 
> org.apache.flink.optimizer.dag.SingleInputNode.accept(SingleInputNode.java:513)
>  at 
> org.apache.flink.optimizer.dag.SingleInputNode.accept(SingleInputNode.java:515)
>  at 
> org.apache.flink.optimizer.dag.SingleInputNode.accept(SingleInputNode.java:515)
>  at 
> org.apache.flink.optimizer.dag.SingleInputNode.accept(SingleInputNode.java:515)
>  at 
> org.apache.flink.optimizer.dag.SingleInputNode.accept(SingleInputNode.java:515)
>  at 
> org.apache.flink.optimizer.dag.SingleInputNode.accept(SingleInputNode.java:515)
>  at org.apache.flink.optimizer.dag.DataSinkNode.accept(DataSinkNode.java:248)
>  at org.apache.flink.optimizer.dag.TwoInputNode.accept(TwoInputNode.java:751)
>  at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:493)
>  at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:399)
>  at 
> org.apache.flink.test.util.TestEnvironment.compileProgram(TestEnvironment.java:132)
>  at 
> org.apache.flink.test.util.TestEnvironment.execute(TestEnvironment.java:105)
>  at 
> org.apache.flink.table.api.internal.BatchTableEnvImpl.execute(BatchTableEnvImpl.scala:225)
>  at 
> com.alibaba.alink.operator.batch.dataproc.SqlBatchOpsTest.testOrderBy(SqlBatchOpsTest.java:306)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke(Method.java:498)
>  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>  at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>  at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
>  at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
>  at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>  at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>  at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>  at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>  at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>  at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>  at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>  at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>  at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>  at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>  at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>  at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
>  at 
> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
>  at 
> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:220)
>  at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:53)
>


signature.asc
Description: Open