Re: Metrics OOM java heap space

2022-08-15 Thread Chesnay Schepler
The granularity setting isn't relevant because it only matters when you 
enable latency metrics, but they are opt-in and the default config is used.


You can only enable/disable specific metrics in the upcoming 1.16.0.

@Yuriy: You said you had 270k Strings in the StreamConfig; is that 
accurate? How many StreamConfig instances are there anyhow? Asking since 
that is a strange number to have.
I wouldn't conclude that metrics are the problem; it could just be that 
you're already running close to the memory budged limit, and the 
additional memory requirements by metrics just ever so slightly push you 
over it.


On 14/08/2022 10:41, yu'an huang wrote:
You can follow the ticked 
https://issues.apache.org/jira/browse/FLINK-10243 as mentioned in that 
stack overflow question to set this parameter:


“metrics.latency.granularity": 
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#metrics-latency-granularity



You only have 1.688gb for your TaskManager. I also suggest you to 
increate the memory configuration otherwise the test may still fail.





On 12 Aug 2022, at 10:52 PM, Yuriy Kutlunin 
 wrote:


Hello Yuan,

I don't override any default settings, docker-compose.yml:
services:
  jobmanager:
    image: flink:1.15.1-java11
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager

  taskmanager:
    image: flink:1.15.1-java11
    depends_on:
      - jobmanager
    command: taskmanager
    ports:
      - "8084:8084"
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
        taskmanager.numberOfTaskSlots: 2
        metrics.reporter.prom.class: 
org.apache.flink.metrics.prometheus.PrometheusReporter

        env.java.opts: -XX:+HeapDumpOnOutOfMemoryError
 From TaskManager log:
INFO  [] - Final TaskExecutor Memory configuration:
INFO  [] -   Total Process Memory:          1.688gb (1811939328 bytes)
INFO  [] -     Total Flink Memory:          1.250gb (1342177280 bytes)
INFO  [] -       Total JVM Heap Memory:     512.000mb (536870902 bytes)
INFO  [] -         Framework:               128.000mb (134217728 bytes)
INFO  [] -         Task:                    384.000mb (402653174 bytes)
INFO  [] -       Total Off-heap Memory:     768.000mb (805306378 bytes)
INFO  [] -         Managed:                 512.000mb (536870920 bytes)
INFO  [] -         Total JVM Direct Memory: 256.000mb (268435458 bytes)
INFO  [] -           Framework:             128.000mb (134217728 bytes)
INFO  [] -           Task:                  0 bytes
INFO  [] -           Network:               128.000mb (134217730 bytes)
INFO  [] -     JVM Metaspace:               256.000mb (268435456 bytes)
INFO  [] -     JVM Overhead:                192.000mb (201326592 bytes)

I would prefer not to configure memory (at this point), because 
memory consumption depends on job structure, so it always can exceed 
configured values.


My next guess is that the problem is not in metrics content, but in 
their number, which increases with the number of operators.
So the next question is if there is a way to exclude metric 
generation on operator level.

Found same question without correct answer on SOF:
https://stackoverflow.com/questions/54215245/apache-flink-limit-the-amount-of-metrics-exposed

On Fri, Aug 12, 2022 at 4:05 AM yu'an huang  wrote:
Hi Yuriy,

How do you set your TaskMananger Memory? I think 40MB is not 
significant high for Flink. And It’s normal to see memory increase if 
you have more parallelism or set another metrics on. You can try 
setting larger moratory for Flink as explained by following documents.


https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/memory/mem_setup/

Best
Yuan



On 12 Aug 2022, at 12:51 AM, Yuriy Kutlunin 
 wrote:


Hi all,

I'm running Flink Cluster in Session Mode via docker-compose as 
stated in docs:

https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/standalone/docker/#session-cluster-yml

After submitting a test job with many intermediate SQL operations 
(~500 select * from ...) and metrics turned on (JMX or Prometheus) I 
got OOM: java heap space on initialization stage.


Turning metrics off allows the job to get to the Running state.
Heap consumption also depends on parallelism - same job succeeds 
when submitted with parallelism 1 instead of 2.


There are Task Manager logs for 4 cases:
JMX parallelism 1 (succeeded)
JMX parallelism 2 (failed)
Prometheus parallelism 2 (failed)
No metrics parallelism 2 (succeeded)

Post OOM heap dump (JMX parallelism 2) shows 2 main consumption points:
1. Big value (40MB) for some task configuration
2. Many instances (~270k) of some heavy (20KB) value in StreamConfig

Seems like all these heavy values are related to weird task names, 
which includes all the operations:
Received task Source: source1 -> SourceConversion[2001] -> mapping1 
-> SourceConversion[2003] ->

Re: Problem with KafkaSource and watermark idleness

2022-08-15 Thread David Anderson
Yan, I've created https://issues.apache.org/jira/browse/FLINK-28975 to
track this.

Regards,
David

On Sun, Aug 14, 2022 at 6:38 PM Yan Shen  wrote:

> Thanks David,
>
> I am working on a flink datastream job that does a temporal join of two
> kafka topics based on watermarks. The problem was quite obvious when I
> enabled idleness and data flowed through much faster with different results
> even though the topics were not idle.
>
> Regards.
>
> On Mon, Aug 15, 2022 at 12:12 AM David Anderson 
> wrote:
>
>> Although I'm not very familiar with the design of the code involved, I
>> also looked at the code, and I'm inclined to agree with you that this is a
>> bug. Please do raise an issue.
>>
>> I'm wondering how you noticed this. I was thinking about how to write a
>> failing test, and I'm wondering if this has some impact that is easily
>> observed. (My first thought was "How can something this basic be broken?"
>> but then I realized that the impact is fairly subtle.)
>>
>> David
>>
>> On Sat, Aug 13, 2022 at 11:46 PM Yan Shen  wrote:
>>
>>> Hi all,
>>>
>>> After examining the source code further, I am quite sure 
>>> org.apache.flink.api.common.eventtime.WatermarksWithIdleness
>>> does not work with FLIP-27 sources.
>>>
>>> In org.apache.flink.streaming.api.operators.SourceOperator, there are
>>> separate instances of WatermarksWithIdleness created for each split
>>> output and the main output. There is multiplexing of watermarks between
>>> split outputs but no multiplexing between split output and main output.
>>>
>>> For a source such as org.apache.flink.connector.kafka.source.KafkaSource,
>>> there is only output from splits and no output from main. Hence the main
>>> output will (after an initial timeout) be marked as idle.
>>>
>>> The implementation of WatermarksWithIdleness is such that once an
>>> output is idle, it will periodically re-mark the output as idle. Since
>>> there is no multiplexing between split outputs and main output, the idle
>>> marks coming from main output will repeatedly set the output to idle even
>>> though there are events from the splits. Result is that the entire source
>>> is repeatedly marked as idle.
>>>
>>> I currently worked around this by implementing my own WatermarksWithIdleness
>>> which will only mark the output as idle once (until it becomes active then
>>> idle again) instead of repeatedly.
>>>
>>> I will try to raise an issue on this unless somebody can point out where
>>> I went wrong with this.
>>>
>>> Thanks.
>>>
>>> On Wed, Aug 10, 2022 at 1:26 PM Yan Shen  wrote:
>>>
 Hi,

 I am using a org.apache.flink.connector.kafka.source.KafkaSource with a
 watermark strategy like this:


 WatermarkStrategy.forMonotonousTimestamps().withIdleness(Duration.ofSeconds(10))

 I noticed that after a short while all the partitions seem to be marked
 as idle even though there are messages coming in.

 I made a copy of the class WatermarksWithIdleness and added some
 logging to trace what is happening.

 It seems there are 2 copies of this WatermarkGenerator created per
 partition. One during SourceOperator.initializeMainOutput and another
 during SourceOperator.createOutputForSplits.

 When there are new messages, only the one created during
 SourceOperator.createOutputForSplits has activity.

 It seems the one created during SourceOperator.initializeMainOutput
 will eventually output an idle mark as it has no activity and this causes
 the entire partition to be marked as idle.

 Is my understanding correct? If so, is this a feature or bug?

 Thanks.

>>>


How to include path information in data extracted from text files with FileSource

2022-08-15 Thread Ken Krugler
Hi all,

We’ve got time-stamped directories containing text files, stored in HDFS.

We can regularly get new files added, so we’re using a FileSource with a 
monitoring duration, so that it continuously picks up any new files.

The challenge is that we need to include the parent directory’s timestamp in 
the output, for doing time-window joins of this enrichment data with another 
stream.

Previously I could extend with the input format 
 to extract path information, and 
emit a Tuple2.

But with the new FileSource architecture, I’m really not sure if it’s possible, 
or if so, the right way to go about doing it.

I’ve wandered through the source code (FileSource, AbstractFileSource, 
SourceReader, FileSourceReader, FileSourceSplit, ad nauseam) but haven’t seen 
any happy path to making that all work.

There might be a way using some really ugly hacks to TextLineFormat, where it 
would reverse engineer the FSDataInputStream to try to find information about 
the original file, but feels very fragile.

Any suggestions?

Thanks!

— Ken


--
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch