FileSource may cause akka.pattern.AskTimeoutException, and akka.ask.timeout not workable

2021-06-09 Thread
Hello all,

Our team encounter *akka.pattern.AskTimeoutException *when start
jobmanager. Base on the error message, we try to setup *akka.ask.timeout *
and* web.timeout *to 360s, but both of them doesn't work.

We guess the issue may cause by *FileSource.forRecordFileFormat.* The
application will load files in batch mode to rebuild our historical data.
The job can run normally in small batch. But it will be broken when run
over lots of files. (around 3 files distributed in 1500 folders)

The flink application is on kubernetes in application mode and files stores
in Google Cloud Storage.

Our questions are,
1. How to enlarge akka.ask.timeout correctly in our case?
2. Is it cause by FileSource? If yes, could you provide some suggestions to
prevent it?


Following is our settings.
```
2021-06-10 03:44:14,317 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: kubernetes.container.image, */:**.*.**
2021-06-10 03:44:14,317 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: fs.hdfs.hadoopconfig, /opt/flink/conf
2021-06-10 03:44:14,317 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: taskmanager.numberOfTaskSlots, 4
2021-06-10 03:44:14,317 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: kubernetes.rest-service.exposed.type, ClusterIP
2021-06-10 03:44:14,317 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: high-availability.jobmanager.port, 6123
2021-06-10 03:44:14,318 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: akka.ask.timeout, 360s
2021-06-10 03:44:14,318 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: state.backend.rocksdb.memory.write-buffer-ratio, 0.7
2021-06-10 03:44:14,318 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: metrics.reporter.prom.class,
org.apache.flink.metrics.prometheus.PrometheusReporter
2021-06-10 03:44:14,318 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: state.storage.fs.memory-threshold, 1048576
2021-06-10 03:44:14,318 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: execution.checkpointing.unaligned, true
2021-06-10 03:44:14,318 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: web.timeout, 100
2021-06-10 03:44:14,318 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: execution.target, kubernetes-application
2021-06-10 03:44:14,318 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: restart-strategy.fixed-delay.attempts, 2147483647
2021-06-10 03:44:14,319 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: jobmanager.memory.process.size, 8g
2021-06-10 03:44:14,319 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: taskmanager.rpc.port, 6122
2021-06-10 03:44:14,319 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: akka.framesize, 104857600b
2021-06-10 03:44:14,319 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: containerized.master.env.HADOOP_CLASSPATH,
/opt/flink/conf:/opt/hadoop-3.1.1/share/hadoop/common/lib/*:/opt/hadoop-3.1.1/share/hadoop/common/*:/opt/hadoop-3.1.1/share/hadoop/hdfs:/opt/hadoop-3.1.1/share/hadoop/hdfs/lib/*:/opt/hadoop-3.1.1/share/hadoop/hdfs/*:/opt/hadoop-3.1.1/share/hadoop/mapreduce/lib/*:/opt/hadoop-3.1.1/share/hadoop/mapreduce/*:/opt/hadoop-3.1.1/share/hadoop/yarn:/opt/hadoop-3.1.1/share/hadoop/yarn/lib/*:/opt/hadoop-3.1.1/share/hadoop/yarn/*:/contrib/capacity-scheduler/*.jar
2021-06-10 03:44:14,319 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: execution.attached, true
2021-06-10 03:44:14,319 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: internal.cluster.execution-mode, NORMAL
2021-06-10 03:44:14,319 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: high-availability,
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
2021-06-10 03:44:14,320 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property:
execution.checkpointing.externalized-checkpoint-retention,
DELETE_ON_CANCELLATION
2021-06-10 03:44:14,320 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property

Re: SourceFunction cannot run in Batch Mode

2021-06-02 Thread
Sorry, there are some typos that may be misleading.

The SourceFunction will be detected as* Streaming Mode.*

陳樺威  於 2021年6月3日 週四 下午1:29寫道:

> Hi,
>
> Currently, we want to use batch execution mode [0] to consume historical
> data and rebuild states for our streaming application.
> The Flink app will be run on-demand and close after complete all the file
> processing.
> We implement a SourceFuntion [1] to consume bounded parquet files from
> GCS. However, the function will be detected as Batch Mode.
>
> Our question is, how to implement a SourceFunction as a Bounded DataStream?
>
> Thanks!
> Oscar
>
> [0]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/execution_mode/
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.html
>
>
>
>


SourceFunction cannot run in Batch Mode

2021-06-02 Thread
Hi,

Currently, we want to use batch execution mode [0] to consume historical
data and rebuild states for our streaming application.
The Flink app will be run on-demand and close after complete all the file
processing.
We implement a SourceFuntion [1] to consume bounded parquet files from GCS.
However, the function will be detected as Batch Mode.

Our question is, how to implement a SourceFunction as a Bounded DataStream?

Thanks!
Oscar

[0]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/execution_mode/
[1]
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.html


Re: KafkaSource metrics

2021-05-24 Thread
Hi Ardhani,

Thanks for your kindly reply.

Our team use your provided metrics before, but the metrics disappear after
migrate to new KafkaSource.

We initialize KafkaSource in following code.
```

val consumer: KafkaSource[T] = KafkaSource.builder()
  .setProperties(properties)
  .setTopics(topic)
  .setValueOnlyDeserializer(deserializer)
  
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
  .build()

env
  .fromSource(consumer, WatermarkStrategy.noWatermarks(), uid)
  .setParallelism(math.min(parallelism, env.getParallelism))
  .setMaxParallelism(parallelism)
  .name(uid).uid(uid)
  .rebalance

```

Oscar

Ardhani Narasimha  於 2021年5月25日 週二
上午12:08寫道:

> Use below respectively
>
> flink_taskmanager_job_task_operator_KafkaConsumer_bytes_consumed_rate -
> Consumer rate
> flink_taskmanager_job_task_operator_KafkaConsumer_records_lag_max -
> Consumer lag
> flink_taskmanager_job_task_operator_KafkaConsumer_commit_latency_max -
> commit latency
>
> unsure if reactive mode makes any difference.
> On Mon, May 24, 2021 at 7:44 PM 陳樺威  wrote:
>
>> Hello,
>>
>> Our team tries to test reactive mode and replace FlinkKafkaConsumer with
>> the new KafkaSource.
>> But we can’t find the KafkaSource metrics list. Does anyone have any
>> idea? In our case, we want to know the Kafka consume delay and consume rate.
>>
>> Thanks,
>> Oscar
>>
>
>
> ---
> *IMPORTANT*: The contents of this email and any attachments are
> confidential and protected by applicable laws. If you have received this
> email by mistake, please (i) notify the sender immediately; (ii) delete it
> from your database; and (iii) do not disclose the contents to anyone or
> make copies thereof. Razorpay accepts no liability caused due to any
> inadvertent/ unintentional data transmitted through this email.
>
> ---
>


KafkaSource metrics

2021-05-24 Thread
Hello,

Our team tries to test reactive mode and replace FlinkKafkaConsumer with
the new KafkaSource.
But we can’t find the KafkaSource metrics list. Does anyone have any idea?
In our case, we want to know the Kafka consume delay and consume rate.

Thanks,
Oscar