Re: batch job OOM

2020-01-22 Thread Fanbin Bu
I got the following error when running another job. any suggestions? Caused by: java.lang.IndexOutOfBoundsException at org.apache.flink.core.memory.MemorySegment.getInt(MemorySegment.java:701) at org.apache.flink.table.dataformat.BinaryRow.getInt(BinaryRow.java:264) at

Re: where does flink store the intermediate results of a join and what is the key?

2020-01-22 Thread kant kodali
Is it a common practice to have a custom state backend? if so, what would be a popular custom backend? Can I do Elasticseatch as a state backend? Thanks! On Wed, Jan 22, 2020 at 1:42 AM Jark Wu wrote: > Hi Kant, > > 1) List of row is also sufficient in this case. Using a MapState is in >

Blocking KeyedCoProcessFunction.processElement1

2020-01-22 Thread Alexey Trenikhun
Hello, If KeyedCoProcessFunction.processElement1 blocks for significant amount of time, will it prevent checkpoint ? Thanks, Alexey

Re: batch job OOM

2020-01-22 Thread Fanbin Bu
Jingsong, I set the config value to be too large. After I changed it to a smaller number it works now! thanks you for the help. really appreciate it! Fanbin On Wed, Jan 22, 2020 at 8:50 PM Jingsong Li wrote: > Fanbin, > > Looks like your config is wrong, can you show your config code? > >

Re: batch job OOM

2020-01-22 Thread Jingsong Li
Fanbin, Looks like your config is wrong, can you show your config code? Best, Jingsong Lee On Thu, Jan 23, 2020 at 12:41 PM Fanbin Bu wrote: > Jingsong, > > Great, now i got a different error: > > java.lang.NullPointerException: Initial Segment may not be null > at >

Re: batch job OOM

2020-01-22 Thread Fanbin Bu
Jingsong, Great, now i got a different error: java.lang.NullPointerException: Initial Segment may not be null at org.apache.flink.runtime.memory.AbstractPagedOutputView.(AbstractPagedOutputView.java:65) at

Re: batch job OOM

2020-01-22 Thread Fanbin Bu
you beat me to it. let's me try that. On Wed, Jan 22, 2020 at 7:57 PM Jingsong Li wrote: > Fanbin, > > Document is here: > https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/config.html > NOTE: you need configure this into TableConfig. > > Best, > Jingsong Lee > > On Thu, Jan

Re: batch job OOM

2020-01-22 Thread Fanbin Bu
I saw the doc in https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/config.html . Do i have to set that in the code or can i do it through flink-conf.yaml? On Wed, Jan 22, 2020 at 7:54 PM Fanbin Bu wrote: > Jingsong, > > Thank you for the response. > Since I'm using flink on EMR

Re: batch job OOM

2020-01-22 Thread Jingsong Li
Fanbin, Document is here: https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/config.html NOTE: you need configure this into TableConfig. Best, Jingsong Lee On Thu, Jan 23, 2020 at 11:54 AM Fanbin Bu wrote: > Jingsong, > > Thank you for the response. > Since I'm using flink

Re: batch job OOM

2020-01-22 Thread Fanbin Bu
Jingsong, Thank you for the response. Since I'm using flink on EMR and the latest version is 1.9 now. the second option is ruled out. but will keep that in mind for future upgrade. I'm going to try the first option. It's probably a good idea to add that in the doc for example:

Re: java.lang.StackOverflowError

2020-01-22 Thread 刘建刚
多谢,已经找到解决的issue了:https://issues.apache.org/jira/browse/FLINK-10367 > 2020年1月22日 下午4:48,zhisheng 写道: > > 1、建议问题别同时发到三个邮件去 > 2、找找还有没有更加明显的异常日志 > > 刘建刚 于2020年1月22日周三 上午10:25写道: > >> I am using flink 1.6.2 on yarn. State backend is rocksdb. >>

Re: batch job OOM

2020-01-22 Thread Jingsong Li
Hi Fanbin, Thanks for using blink batch mode. The OOM is caused by the manage memory not enough in Hash aggregation. There are three options you can choose from: 1.Is your version Flink 1.9? 1.9 still use fix memory configuration. So you need increase hash memory: -

Re: Usage of KafkaDeserializationSchema and KafkaSerializationSchema

2020-01-22 Thread Jason Kania
Thanks for responding. I am aware where the topic is used. What I do not see is how to set the topic within the class that implements the KafkaSerializationSchema.serialize(  T classObject, Long timestamp ) method. The method must create and return a value of type ProducerRecord, but all the

Re: Usage of KafkaDeserializationSchema and KafkaSerializationSchema

2020-01-22 Thread David Magalhães
Hi Jason, The topic is used in *FlinkKafkaConsumer*, following the *KafkaDeserializationSchema* and then *Properties*. https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.html new FlinkKafkaConsumer(kafkaTopic, new

Re: Flink ParquetAvroWriters Sink

2020-01-22 Thread aj
Hi Arvid, I want to keep generic records only and I do not want to keep the schema definition on the consumer side and should be resolve from the schema registry only. I am following the below post

Usage of KafkaDeserializationSchema and KafkaSerializationSchema

2020-01-22 Thread Jason Kania
Hello, I was looking for documentation in 1.9.1 on how to create implementations of the KafkaSerializationSchema and KafkaDeserializationSchema interfaces. I have created implementations in the past for the SerializationSchema and DeserializationSchema interface. Unfortunately, I can find no

Re: Custom label for Prometheus Exporter

2020-01-22 Thread Austin Cawley-Edwards
Following up, we deploy to K8s with one service per job manager and task manager for metrics, and we add job-identifying labels to those. We also use the Prometheus Operator which makes it easy to add those labels as dimensions when scraping. Best, Austin On Wed, Jan 22, 2020 at 7:21 PM Austin

Re: Custom label for Prometheus Exporter

2020-01-22 Thread Austin Cawley-Edwards
Hey Anaray, Have you checked out the “scope” configuration?[1] Best, Austin [1]: https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#scope On Wed, Jan 22, 2020 at 4:09 PM anaray wrote: > Hi flink team, > > Is there a way to add a custom label to flink metrics when

[State Processor API] how to convert savepoint back to broadcast state

2020-01-22 Thread Jin Yi
Hi there, I would like to read the savepoints (for broadcast state) back into the broadcast state, how should I do it? // load the existingSavepoint; ExistingSavepoint existingSavepoint = Savepoint.load(environment, "file:///tmp/new_savepoints", new MemoryStateBackend()); // read state from

[State Processor API] how to convert savepoint back to broadcast state

2020-01-22 Thread Jin Yi
Hi there, I would like to read the savepoints (for broadcast state) back into the broadcast state, how should I do it? // load the existingSavepoint; ExistingSavepoint existingSavepoint = Savepoint.load(environment, "file:///tmp/new_savepoints", new MemoryStateBackend()); // read state from

Re: batch job OOM

2020-01-22 Thread Fanbin Bu
tried to increase memory: flink run -m yarn-cluster -p 16 -ys 1 -ytm 20 -yjm 8096 myjar and still got the same OOM exception. my sql is like: select id, hop_end(created_at, interval '30' second, interval '1' minute), sum(field)... #20 of these sums from table group by id, hop(created_at,

batch job OOM

2020-01-22 Thread Fanbin Bu
Hi, I have a batch job using blink planner. and got the following error. I was able to successfully run the same job with flink 1.8 on yarn. I set conf as: taskmanager.heap.size: 5m and flink UI gives me Last Heartbeat:20-01-22 14:56:25ID:container_1579720108062_0018_01_20Data

TableSource being duplicated

2020-01-22 Thread Benoît Paris
Hello all! I'm having a problem with TableSources' DataStream being duplicated when pulled on from 2 sinks. I understand that sometimes the best plan might just be to duplicate and read both times a TableSource/SourceFunction; but in my case I can't quite reproduce the data as say Kafka would. I

Custom label for Prometheus Exporter

2020-01-22 Thread anaray
Hi flink team, Is there a way to add a custom label to flink metrics when using Prometheus Exporter ? I need to add a label= for the JobManager metrics. As of now I see only host label for example *flink_jobmanager_Status_JVM_Memory_Direct_Count{host="localhost",} 18.0* This is not of big

Re: How to get Task metrics with StatsD metric reporter?

2020-01-22 Thread John Smith
Hi, 1- Yes. I have spaces in the job name and task. How do you configure the metric scope for a particular job? 2- I opted for the second solution, I forked my own StatsD reporter and squashed all spaces. Here:

Re: Flink ParquetAvroWriters Sink

2020-01-22 Thread Arvid Heise
Hi Anuj, I recommend using the ConfluentRegistryAvroDeserializationSchema [1] with a specific record that has been generated with the Avro Maven Plugin [2] or Avro Gradle Plugin [3]. That should result into almost no code and maximal maintainability. [1]

Re: Flink ParquetAvroWriters Sink

2020-01-22 Thread aj
Hi Arvid, I have implemented the code with envelope schema as you suggested but now I am facing issues with the consumer . I have written code like this: FlinkKafkaConsumer010 kafkaConsumer010 = new FlinkKafkaConsumer010(KAFKA_TOPICS, new

Re: [DISCUSS] decentralized scheduling strategy is needed

2020-01-22 Thread Till Rohrmann
Thanks for reporting the issue HuWeihua. Choosing the right scheduling strategy when using Yarn with potentially infinite resources can be quite hard because you don't know over how many TaskExecutors one should distribute the tasks. It becomes easier if one can configure the minimum number of

Re: Flink Metrics - PrometheusReporter

2020-01-22 Thread Sidney Feiner
Ok, I configured the PrometheusReporter's ports to be a range and now every TaskManager has it's own port where I can see it's metrics. Thank you very much! Sidney Feiner / Data Platform Developer M: +972.528197720 / Skype: sidney.feiner.startapp [emailsignature]

Re: Flink configuration on Docker deployment

2020-01-22 Thread Soheil Pourbafrani
Thanks a lot! On Wed, Jan 22, 2020 at 3:58 AM Yang Wang wrote: > Hi Soheil, > > Since you are not using any container orchestration framework(e.g. > docker-compose, Kubernetes, > mesos), so you need to manually update the flink-conf.yaml in your docker > images. Usually, it is > located in the

Re: Flink Metrics - PrometheusReporter

2020-01-22 Thread Chesnay Schepler
Metrics are exposed via reporters by each process separately, whereas the WebUI aggregates metrics. As such you have to configure Prometheus to also scrape the TaskExecutors. On 22/01/2020 16:58, Sidney Feiner wrote: Hey, I've been trying to use the PrometheusReporter and when I used in

Flink Metrics - PrometheusReporter

2020-01-22 Thread Sidney Feiner
Hey, I've been trying to use the PrometheusReporter and when I used in locally on my computer, I would access the port I configured and see all the metrics I've created. In production, we use High Availability mode and when I try to access the JobManager's metrics in the port I've configured on

Re: Custom Metrics outside RichFunctions

2020-01-22 Thread David Magalhães
Thanks for the feedback. I will use elastalert to generate an alarm from the logs. On Wed, Jan 22, 2020, 15:03 Chesnay Schepler wrote: > It is not possible to access metrics from within a schema. > > I can't think of a non-hacky workaround (the hacky one being to create a > custom kafka

Re: Custom Metrics outside RichFunctions

2020-01-22 Thread Chesnay Schepler
It is not possible to access metrics from within a schema. I can't think of a non-hacky workaround (the hacky one being to create a custom kafka consumer that checks the schema class, casts it to your specific class, and then calls a method on your schema that accepts a metric group). On

Re: request for a flink sink

2020-01-22 Thread zhisheng
hi,flink don't have facebook faiss connector now, you can custom Sink (implement SinkFunction) 容祖儿 于2020年1月22日周三 下午7:55写道: > Hi members, > > Do you know if there is a sink who writes data to facebook faiss[1]? > I am looking for a sink class like this one [2]. > > [1]

Re: Custom Metrics outside RichFunctions

2020-01-22 Thread David Magalhães
Hi Yun, I'm trying to use inside a custom *DeserializationSchema*. Here is the constructor of *FlinkKafkaConsumer*. Inside *DeserializationSchema* I can't use *getRuntimeContext()*. FlinkKafkaConsumer

Re: GC overhead limit exceeded, memory full of DeleteOnExit hooks for S3a files

2020-01-22 Thread Piotr Nowojski
Hi, This is probably a known issue of Hadoop [1]. Unfortunately it was only fixed in 3.3.0. Piotrek [1] https://issues.apache.org/jira/browse/HADOOP-15658 > On 22 Jan 2020, at 13:56, Till Rohrmann wrote: > > Thanks for reporting this

Re: GC overhead limit exceeded, memory full of DeleteOnExit hooks for S3a files

2020-01-22 Thread Till Rohrmann
Thanks for reporting this issue Mark. I'm pulling Klou into this conversation who knows more about the StreamingFileSink. @Klou does the StreamingFileSink relies on DeleteOnExitHooks to clean up files? Cheers, Till On Tue, Jan 21, 2020 at 3:38 PM Mark Harris wrote: > Hi, > > We're using flink

request for a flink sink

2020-01-22 Thread 容祖儿
Hi members, Do you know if there is a sink who writes data to facebook faiss[1]? I am looking for a sink class like this one [2]. [1] https://github.com/facebookresearch/faiss [2] https://ci.apache.org/projects/flink/flink-docs-release-1.9/zh/dev/connectors/rabbitmq.html Thank you.

request for a flink sink

2020-01-22 Thread 容祖儿
Hi members, Do you know if there is a sink who writes data to facebook faiss[1]? I am looking for a sink class like this one [2]. [1] https://github.com/facebookresearch/faiss [2] https://ci.apache.org/projects/flink/flink-docs-release-1.9/zh/dev/connectors/rabbitmq.html Thank you.

Re: where does flink store the intermediate results of a join and what is the key?

2020-01-22 Thread Jark Wu
Hi Kant, 1) List of row is also sufficient in this case. Using a MapState is in order to retract a row faster, and save the storage size. 2) State Process API is usually used to process save point. I’m afraid the performance is not good to use it for querying. On the other side, AFAIK, State

Re: Influxdb reporter not honouring the metrics scope

2020-01-22 Thread David Anderson
Gaurav, I haven't used it for a couple of years, so I don't know if it still works, but https://github.com/jgrier/flink-stuff/tree/master/flink-influx-reporter is an influxdb reporter (wrapped around https://github.com/davidB/metrics-influxdb/tree/master/src/main/java/metrics_influxdb) that uses

Re: How to get Task metrics with StatsD metric reporter?

2020-01-22 Thread Chesnay Schepler
I presume your job/task names contains a space, which is included in the metrics scope? You can either configure the metric scope such that the job/task ID is included instead, or create a modified version of the StatsDReporter that filters out additional characters(i.e., override

Re: java.lang.StackOverflowError

2020-01-22 Thread zhisheng
1、建议问题别同时发到三个邮件去 2、找找还有没有更加明显的异常日志 刘建刚 于2020年1月22日周三 上午10:25写道: > I am using flink 1.6.2 on yarn. State backend is rocksdb. > > > 2020年1月22日 上午10:15,刘建刚 写道: > > > > I have a flink job which fails occasionally. I am eager to avoid > this problem. Can anyone help me? The error stacktrace

Re: Re: Taskmanager fails to connect to Jobmanager [Could not find any IPv4 address that is not loopback or link-local. Using localhost address.]

2020-01-22 Thread Kumar Bolar, Harshith
Thank you, Yangze and Yang. Turns out the high-availability.cluster-id parameter on the TM and JM were different. After updating it, the issue went away. On 17/01/20, 3:14 PM, "Yangze Guo" wrote: Hi, Harshith As a supplementary note to Yang, the issue seems to be that something

Re: flink on yarn任务启动报错 The assigned slot container_e10_1579661300080_0005_01_000002_0 was removed.

2020-01-22 Thread zhisheng
应该是你作业之前挂过了 郑 洁锋 于2020年1月22日周三 上午11:16写道: > 大家好, >flink on yarn任务启动时,发现报错了The assigned slot > container_e10_1579661300080_0005_01_02_0 was removed. >环境:flink1.8.1,cdh5.14.2,kafka0.10,jdk1.8.0_241 > > flink版本为1.8.1,yarn上的日志: > > 20/01/22 11:07:53 INFO