Re: Heartbeat of TaskManager timed out.

2020-07-06 Thread Ori Popowski
Hi,

I just wanted to update that the problem is now solved!

I suspect that Scala's flatten() method has a memory problem on very large
lists (> 2 billion elements). When using Scala Lists, the memory seems to
leak but the app keeps running, and when using Scala Vectors, a weird
IllegalArgumentException is thrown [1].

I implemented my own flatten() method using Arrays and quickly ran into
NegativeArraySizeException since the integer representing the array size
wrapped around at Integer.MaxValue and became negative. After I started
catching this exception all my cluster problems just resolved. Checkpoints,
the heartbeat timeout, and also the memory and CPU utilization.

I still need to confirm my suspicion towards Scala's flatten() though,
since I haven't "lab-tested" it.

[1] https://github.com/NetLogo/NetLogo/issues/1830

On Sun, Jul 5, 2020 at 2:21 PM Ori Popowski  wrote:

> Hi,
>
> I initially thought this, so this is why my heap is almost 30GiB.
> However, I started to analyze the Java Flight Recorder files, and I
> suspect there's a memory leak in Scala's flatten() method.
> I changed the line that uses flatten(), and instead of flatten() I'm just
> creating a ByteArray the size flatten() would have returned, and I no
> longer have the heartbeat problem.
>
> So now my code is
> val recordingData = recordingBytes.flatten
>
> instead of
> val recordingData =
> Array.fill[Byte](recordingBytes.map(_.length).sum)(0)
>
> I attach a screenshot of Java Mission Control
>
>
>
> On Fri, Jul 3, 2020 at 7:24 AM Xintong Song  wrote:
>
>> I agree with Roman's suggestion for increasing heap size.
>>
>> It seems that the heap grows faster than freed. Thus eventually the Full
>> GC is triggered, taking more than 50s and causing the timeout. However,
>> even the full GC frees only 2GB space out of the 28GB max size. That
>> probably suggests that the max heap size is not sufficient.
>>
>>> 2020-07-01T10:15:12.869+: [Full GC (Allocation Failure)
>>>  28944M->26018M(28960M), 51.5256128 secs]
>>> [Eden: 0.0B(1448.0M)->0.0B(1448.0M) Survivors: 0.0B->0.0B Heap:
>>> 28944.6M(28960.0M)->26018.9M(28960.0M)], [Metaspace:
>>> 113556K->112729K(1150976K)]
>>>   [Times: user=91.08 sys=0.06, real=51.53 secs]
>>
>>
>> I would not be so sure about the memory leak. I think it could be a
>> normal pattern that memory keeps growing as more data is processed. E.g.,
>> from the provided log, I see window operation tasks executed in the task
>> manager. Such operation might accumulate data until the window is emitted.
>>
>> Maybe Ori you can also take a look at the task manager log when the job
>> runs with Flink 1.9 without this problem, see how the heap size changed. As
>> I mentioned before, it is possible that, with the same configurations Flink
>> 1.10 has less heap size compared to Flink 1.9, due to the memory model
>> changes.
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Thu, Jul 2, 2020 at 8:58 PM Ori Popowski  wrote:
>>
>>> Thank you very much for your analysis.
>>>
>>> When I said there was no memory leak - I meant that from the specific
>>> TaskManager I monitored in real-time using JProfiler.
>>> Unfortunately, this problem occurs only in 1 of the TaskManager and you
>>> cannot anticipate which. So when you pick a TM to profile at random -
>>> everything looks fine.
>>>
>>> I'm running the job again with Java FlightRecorder now, and I hope I'll
>>> find the reason for the memory leak.
>>>
>>> Thanks!
>>>
>>> On Thu, Jul 2, 2020 at 3:42 PM Khachatryan Roman <
>>> khachatryan.ro...@gmail.com> wrote:
>>>
 Thanks, Ori

 From the log, it looks like there IS a memory leak.

 At 10:12:53 there was the last "successfull" gc when 13Gb freed in
 0.4653809 secs:
 [Eden: 17336.0M(17336.0M)->0.0B(2544.0M) Survivors: 40960.0K->2176.0M
 Heap: 23280.3M(28960.0M)->10047.0M(28960.0M)]

 Then the heap grew from 10G to 28G with GC not being able to free up
 enough space:
 [Eden: 2544.0M(2544.0M)->0.0B(856.0M) Survivors: 2176.0M->592.0M Heap:
 12591.0M(28960.0M)->11247.0M(28960.0M)]
 [Eden: 856.0M(856.0M)->0.0B(1264.0M) Survivors: 592.0M->184.0M Heap:
 12103.0M(28960.0M)->11655.0M(28960.0M)]
 [Eden: 1264.0M(1264.0M)->0.0B(1264.0M) Survivors: 184.0M->184.0M Heap:
 12929.0M(28960.0M)->12467.0M(28960.0M)]
 ... ...
 [Eden: 1264.0M(1264.0M)->0.0B(1264.0M) Survivors: 184.0M->184.0M Heap:
 28042.6M(28960.0M)->27220.6M(28960.0M)]
 [Eden: 1264.0M(1264.0M)->0.0B(1264.0M) Survivors: 184.0M->184.0M Heap:
 28494.5M(28960.0M)->28720.6M(28960.0M)]
 [Eden: 224.0M(1264.0M)->0.0B(1448.0M) Survivors: 184.0M->0.0B Heap:
 28944.6M(28960.0M)->28944.6M(28960.0M)]

 Until 10:15:12 when GC freed almost 4G - but it took 51 seconds and
 heartbeat timed out:
 2020-07-01T10:15:12.869+: [Full GC (Allocation Failure)
  28944M->26018M(28960M), 51.5256128 secs]
   [Eden: 0.0B(1448.0M)->0.0B(1448.0M) Survivors: 0.0B->0.0B Heap:
 28944

Re: Kafka Rate Limit in FlinkConsumer ?

2020-07-06 Thread David Magalhães
Thanks for the reply Chen.

My use case is a "simple" get from Kafka into S3. The job can read very
quickly from Kafka and S3 is having some issues keeping up. The
backpressure don't have enough time to actuate in this case, and when it
reaches the checkpoint time some errors like heartbeat timeout or task
manager didn't reply back starts to happen.

I will investigate further and try this example.

On Mon, Jul 6, 2020 at 5:45 PM Chen Qin  wrote:

> My two cents here,
>
> - flink job already has back pressure so rate limit can be done via
> setting parallelism to proper number in some use cases. There is an open
> issue of checkpointing reliability when back pressure, community seems
> working on it.
>
> - rate limit can be abused easily and cause lot of confusions. Think about
> a use case where you have two streams do a simple interval join. Unless you
> were able to rate limit both with proper value dynamiclly, you might see
> timestamp and watermark gaps keep increasing causing checkpointing failure.
>
> So the question might be, instead of looking at rate limit of one source,
> how to slow down all sources without ever increasing time, wm gaps. It
> sounds complicated already.
>
> with what being said, if you really want to have rate limit on your own,
> you can try following code :) It works well for us.
>
> public class SynchronousKafkaConsumer extends FlinkKafkaConsumer {
>
>   protected static final Logger LOG = 
> LoggerFactory.getLogger(SynchronousKafkaConsumer.class);
>
>   private final double topicRateLimit;
>   private transient RateLimiter subtaskRateLimiter;
>
>
> @Override
> public void open(Configuration configuration) throws Exception {
>   Preconditions.checkArgument(
>   topicRateLimit / getRuntimeContext().getNumberOfParallelSubtasks() > 
> 0.1,
>   "subtask ratelimit should be greater than 0.1 QPS");
>   subtaskRateLimiter = RateLimiter.create(
>   topicRateLimit / getRuntimeContext().getNumberOfParallelSubtasks());
>   super.open(configuration);
> }
>
>
> @Override
> protected AbstractFetcher createFetcher(
> SourceContext sourceContext,
> Map partitionsWithOffsets,
> SerializedValue> watermarksPeriodic,
> SerializedValue> watermarksPunctuated,
> StreamingRuntimeContext runtimeContext,
> OffsetCommitMode offsetCommitMode,
> MetricGroup consumerMetricGroup, boolean useMetrics)
> throws Exception {
>
>   return new KafkaFetcher(
>   sourceContext,
>   partitionsWithOffsets,
>   watermarksPeriodic,
>   watermarksPunctuated,
>   runtimeContext.getProcessingTimeService(),
>   runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
>   runtimeContext.getUserCodeClassLoader(),
>   runtimeContext.getTaskNameWithSubtasks(),
>   deserializer,
>   properties,
>   pollTimeout,
>   runtimeContext.getMetricGroup(),
>   consumerMetricGroup,
>   useMetrics) {
> @Override
> protected void emitRecord(T record,
>   KafkaTopicPartitionState 
> partitionState,
>   long offset) throws Exception {
>   subtaskRateLimiter.acquire();
>   if (record == null) {
> consumerMetricGroup.counter("invalidRecord").inc();
>   }
>   super.emitRecord(record, partitionState, offset);
> }
>
> @Override
> protected void emitRecordWithTimestamp(T record,
> 
> KafkaTopicPartitionState partitionState,
>long offset, long timestamp) 
> throws Exception {
>   subtaskRateLimiter.acquire();
>   if (record == null) {
> consumerMetricGroup.counter("invalidRecord").inc();
>   }
>   super.emitRecordWithTimestamp(record, partitionState, offset, 
> timestamp);
> }
>   };
>
> }
>
> Thanks,
>
> Chen
> Pinterest Data
>
>
> On Jul 6, 2020, at 7:43 AM, David Magalhães  wrote:
>
> I've noticed that this FLINK-11501 was implemented in
> flink-connector-kafka-0.10 [1], but it wasn't in the current version of the
> flink-connector-kafka. There is any reason for this, and why should be the
> best solution to implement a rate limit functionality in the current Kafka
> consumer?
>
> Thanks,
> David
>
> [1]
> https://github.com/lyft/flink/blob/release-1.11-lyft/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
>
> [2]
> https://github.com/lyft/flink/blob/release-1.11-lyft/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
>
>
>


Decompressing Tar Files for Batch Processing

2020-07-06 Thread Austin Cawley-Edwards
Hey all,

I need to ingest a tar file containing ~1GB of data in around 10 CSVs. The
data is fairly connected and needs some cleaning, which I'd like to do with
the Batch Table API + SQL (but have never used before). I've got a small
prototype loading the uncompressed CSVs and applying the necessary SQL,
which works well.

I'm wondering about the task of downloading the tar file and unzipping it
into the CSVs. Does this sound like something I can/ should do in Flink, or
should I set up another process to download, unzip, and store in a
filesystem to then read with the Flink Batch job? My research is leading me
towards doing it separately but I'd like to do it all in the same job if
there's a creative way.

Thanks!
Austin


SSL for QueryableStateClient

2020-07-06 Thread mail2so...@yahoo.co.in
Hello,
I am running flink on Kubernetes, and from outside the Ingress to a proxy on 
Kubernetes is via SSL 443 PORT only.
Can you please provide guidance on how to setup the SSL for 
QueryableStateClient, the client to inquire the state. 

Please let me know if any other details is needed.
Thanks & RegardsSouma Suvra Ghosh

Re: Asynchronous I/O poor performance

2020-07-06 Thread Arvid Heise
Hi Mark,

Async wait operators cannot be chained to sources so the messages go
through the network stack. Thus, having some latency is normal and cannot
be avoided. It can be tuned though, but I don't think that this is the
issue at hand as it should mostly impact latency and affect throughput
less. Since external I/O calls are much more heavy weight than our internal
communication, both the drop of throughput and the increase in latency are
usually dwarfed by the external I/O call costs.

Please try to increase the thread pool for akka as written in my previous
email and report back.

On Mon, Jul 6, 2020 at 9:44 PM Mark Zitnik  wrote:

> Hi Benchao,
>
> i have run this in the code:
>
> println(env.getConfig.getAutoWatermarkInterval)
>
> and got 200 i do fully understand how watermarks and AsyncOperator
> operator works, but
> i have decided to make a simple test that should evaluate the time it
> takes to enter to the asyncInvoke method  and it looks that it takes about
> 80ms witch is longer than the time it take to get a response from my
> micro-service
>
> code below
>
> class AsyncDatabaseRequest extends RichAsyncFunction[String, (String, 
> String)] {
>
>   implicit lazy val executor: ExecutionContext = 
> ExecutionContext.fromExecutor(Executors.directExecutor())
>
>   /*
>   implicit val actorSystem = ActorSystem.apply("test", None, None, 
> Some(executor))
>   implicit val materializer = ActorMaterializer()
>   implicit val executionContext = actorSystem.dispatcher
>
>
>   println(materializer.system.name)
>   println("start")
>   */
> // redis-streaming-dev-new.xwudy5.ng.0001.use1.cache.amazonaws.com
>
>   // redis-streaming-dev-001.xwudy5.0001.use1.cache.amazonaws.com
>   var actorSystem: ActorSystem = null
>   var materializer: ActorMaterializer = null
>   var executionContext: ExecutionContextExecutor = null
>   //var akkaHttp: HttpExt = null
>
>   override def open(parameters: Configuration): Unit = {
> actorSystem = akka.actor.ActorSystem(UUID.randomUUID().toString, 
> Some(ConfigFactory.load("application.conf")), None, Some(executor))
> materializer = ActorMaterializer()(actorSystem)
> executionContext = actorSystem.dispatcher
> //akkaHttp = Http(actorSystem)
>   }
>
>   override def close(): Unit = {
> actorSystem.terminate()
>   }
>
>   override def asyncInvoke(str: String, resultFuture: ResultFuture[(String, 
> String)]): Unit = {
> val start = str.toLong
> val delta = System.currentTimeMillis() - start
> resultFuture.complete(Iterable((str, s"${delta}")))
>   }
> }
>
>
> object Job {
>   def main(args: Array[String]): Unit = {
> // set up the execution environment
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> //env.enableCheckpointing(10)
> env.setParallelism(1)
>
> val someIntegers: DataStream[Long] = env.generateSequence(1, 100)
> //someIntegers.map { _ => System.currentTimeMillis()}.map{ s => 
> System.currentTimeMillis()-s}.print()
> val x : DataStream[String] = someIntegers.map( _ => 
> s"${System.currentTimeMillis()}")
> val resultStream: DataStream[(String, String)] = 
> AsyncDataStream.unorderedWait(x, new AsyncDatabaseRequest(), 10L, 
> TimeUnit.MILLISECONDS, 100)//.setParallelism(16)
>   //AsyncDataStream.unorderedWait(data , new 
> AsyncDatabaseRequest,3L,TimeUnit.SECONDS)
> resultStream.print()
> println(env.getConfig.getAutoWatermarkInterval)
> env.execute("Flink Scala API Skeleton")
>   }
> }
>
> is this normal behavior?
>
>
> On Mon, Jul 6, 2020 at 2:45 PM Benchao Li  wrote:
>
>> Hi Mark,
>>
>> According to your data, I think the config of AsyncOperator is OK.
>> There is one more config that might affect the throughput of
>> AsyncOperator, it's watermark.
>> Because unordered async operator still keeps the order between
>> watermarks, did you use
>> event time in your job, and if yes, what's the watermark interval in your
>> job?
>>
>> Mark Zitnik  于2020年7月5日周日 下午7:44写道:
>>
>>> Hi Benchao
>>>
>>> The capacity is 100
>>> Parallelism is 8
>>> Rpc req is 20ms
>>>
>>> Thanks
>>>
>>>
>>> On Sun, 5 Jul 2020, 6:16 Benchao Li,  wrote:
>>>
 Hi Mark,

 Could you give more details about your Flink job?
 - the capacity of AsyncDataStream
 - the parallelism of AsyncDataStream operator
 - the time of per blocked rpc request

 Mark Zitnik  于2020年7月5日周日 上午3:48写道:

> Hi
>
> In my flink application I need to enrich data using 
> AsyncDataStream.unorderedWait
> but I am getting poor perforce at the beginning I was just working
> with http call, but I have switched to grpc, I running on 8 core node and
> getting total of 3200 events per second my service that I am using is not
> fully utilized and can produce up to 1 req/seq
>
> Flink job flow
> Reading from Kafka ~> some enrichment with unoderedwait ~> map ~>
> write to Kafka
>>

Re: Flink AskTimeoutException killing the jobs

2020-07-06 Thread M Singh
 Thanks Xintong.  I will check the logs.  
On Sunday, July 5, 2020, 09:29:31 PM EDT, Xintong Song 
 wrote:  
 
 As I already mentioned,

I would suggest to look into the jobmanager logs and gc logs, see if there's 
any problem that prevent the process from handling the rpc messages timely.


The Akka ask timeout does not seem to be the root problem to me.

Thank you~

Xintong Song




On Sat, Jul 4, 2020 at 12:12 AM M Singh  wrote:

 Hi Xintong/LakeShen:
We have the following setting in flink-conf.yaml

akka.ask.timeout: 180 s

akka.tcp.timeout: 180 s


But still see this exception.  Are there multiple akka.ask.timeout or 
additional settings required ?

Thanks
Mans
On Friday, July 3, 2020, 01:08:05 AM EDT, Xintong Song 
 wrote:  
 
 The configuration option you're looking for is `akka.ask.timeout`.




However, I'm not sure increasing this configuration would help in your case. 
The error message shows that there is a timeout on a local message. It is wired 
a local message does not get replied within 10 sec. I would suggest to look 
into the jobmanager logs and gc logs, see if there's any problem that prevent 
the process from handling the rpc messages timely.




Thank you~

Xintong Song




On Fri, Jul 3, 2020 at 3:51 AM M Singh  wrote:

Hi:
I am using Flink 1.10 on AWS EMR cluster.
We are getting AskTimeoutExceptions which is causing the flink jobs to die.   
Caused by: akka.pattern.AskTimeoutException: Ask timed out on 
[Actor[akka://flink/user/resourcemanager#-1602864959]] after [1 ms]. 
Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A 
typical reason for `AskTimeoutException` is that the recipient actor didn't 
send a reply.at 
akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635)at 
akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635)at 
akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:648)
... 9 more

Can you please let me know where can I set the timeout for this timeout ? 
I could not find this specific timeout in the flink doc - Apache Flink 1.10 
Documentation: Configuration.

Thanks
Mans
  
  

Re: Asynchronous I/O poor performance

2020-07-06 Thread Mark Zitnik
Hi Benchao,

i have run this in the code:

println(env.getConfig.getAutoWatermarkInterval)

and got 200 i do fully understand how watermarks and AsyncOperator operator
works, but
i have decided to make a simple test that should evaluate the time it takes
to enter to the asyncInvoke method  and it looks that it takes about 80ms
witch is longer than the time it take to get a response from my
micro-service

code below

class AsyncDatabaseRequest extends RichAsyncFunction[String, (String, String)] {

  implicit lazy val executor: ExecutionContext =
ExecutionContext.fromExecutor(Executors.directExecutor())

  /*
  implicit val actorSystem = ActorSystem.apply("test", None, None,
Some(executor))
  implicit val materializer = ActorMaterializer()
  implicit val executionContext = actorSystem.dispatcher


  println(materializer.system.name)
  println("start")
  */
// redis-streaming-dev-new.xwudy5.ng.0001.use1.cache.amazonaws.com

  // redis-streaming-dev-001.xwudy5.0001.use1.cache.amazonaws.com
  var actorSystem: ActorSystem = null
  var materializer: ActorMaterializer = null
  var executionContext: ExecutionContextExecutor = null
  //var akkaHttp: HttpExt = null

  override def open(parameters: Configuration): Unit = {
actorSystem = akka.actor.ActorSystem(UUID.randomUUID().toString,
Some(ConfigFactory.load("application.conf")), None, Some(executor))
materializer = ActorMaterializer()(actorSystem)
executionContext = actorSystem.dispatcher
//akkaHttp = Http(actorSystem)
  }

  override def close(): Unit = {
actorSystem.terminate()
  }

  override def asyncInvoke(str: String, resultFuture:
ResultFuture[(String, String)]): Unit = {
val start = str.toLong
val delta = System.currentTimeMillis() - start
resultFuture.complete(Iterable((str, s"${delta}")))
  }
}


object Job {
  def main(args: Array[String]): Unit = {
// set up the execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//env.enableCheckpointing(10)
env.setParallelism(1)

val someIntegers: DataStream[Long] = env.generateSequence(1, 100)
//someIntegers.map { _ => System.currentTimeMillis()}.map{ s =>
System.currentTimeMillis()-s}.print()
val x : DataStream[String] = someIntegers.map( _ =>
s"${System.currentTimeMillis()}")
val resultStream: DataStream[(String, String)] =
AsyncDataStream.unorderedWait(x, new AsyncDatabaseRequest(), 10L,
TimeUnit.MILLISECONDS, 100)//.setParallelism(16)
  //AsyncDataStream.unorderedWait(data , new
AsyncDatabaseRequest,3L,TimeUnit.SECONDS)
resultStream.print()
println(env.getConfig.getAutoWatermarkInterval)
env.execute("Flink Scala API Skeleton")
  }
}

is this normal behavior?


On Mon, Jul 6, 2020 at 2:45 PM Benchao Li  wrote:

> Hi Mark,
>
> According to your data, I think the config of AsyncOperator is OK.
> There is one more config that might affect the throughput of
> AsyncOperator, it's watermark.
> Because unordered async operator still keeps the order between watermarks,
> did you use
> event time in your job, and if yes, what's the watermark interval in your
> job?
>
> Mark Zitnik  于2020年7月5日周日 下午7:44写道:
>
>> Hi Benchao
>>
>> The capacity is 100
>> Parallelism is 8
>> Rpc req is 20ms
>>
>> Thanks
>>
>>
>> On Sun, 5 Jul 2020, 6:16 Benchao Li,  wrote:
>>
>>> Hi Mark,
>>>
>>> Could you give more details about your Flink job?
>>> - the capacity of AsyncDataStream
>>> - the parallelism of AsyncDataStream operator
>>> - the time of per blocked rpc request
>>>
>>> Mark Zitnik  于2020年7月5日周日 上午3:48写道:
>>>
 Hi

 In my flink application I need to enrich data using 
 AsyncDataStream.unorderedWait
 but I am getting poor perforce at the beginning I was just working with
 http call, but I have switched to grpc, I running on 8 core node and
 getting total of 3200 events per second my service that I am using is not
 fully utilized and can produce up to 1 req/seq

 Flink job flow
 Reading from Kafka ~> some enrichment with unoderedwait ~> map ~> write
 to Kafka

 Using Akkad grpc code written in scala

 Thanks

>>>
>>>
>>> --
>>>
>>> Best,
>>> Benchao Li
>>>
>>
>
> --
>
> Best,
> Benchao Li
>


Re: Flink Parallelism for various type of transformation

2020-07-06 Thread Arvid Heise
Hi Prasanna,

overcommitting cores was actually a recommended technique a while ago to
counter-balance I/O. So it's not bad per se.

However, with slot sharing each core is already doing the work for source,
transform, sink, so it's not necessary. So I'd go with slots = cores and I
rather strongly suggest to switch to async I/O to perform the external
transformation. [1]

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html

On Mon, Jul 6, 2020 at 7:01 PM Prasanna kumar 
wrote:

> Hi ,
>
> I used t2.medium machines for the task manager nodes. It has 2 CPU and 4GB
> memory.
>
> But the task manager screen shows that there are 4 slots.
>
> Generally we should match the number of slots to the number of cores.
>
> [image: image.png]
>
> Our pipeline is Source -> Simple Transform -> Sink.
>
> What happens when we have more slots than cores in following scenarios?
> 1) The transform is just changing of json format.
>
> 2)  When the transformation is done by hitting another server (HTTP
> request)
>
> Thanks,
> Prasanna.
>


-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Asynchronous I/O poor performance

2020-07-06 Thread Arvid Heise
Hi Mark,

could you please check if you can tune akka? Usually in async I/O, the used
library uses a thread pool that becomes the actual bottleneck.

If you configure async I/O to use a capacity of 100 and parallelism of 8 on
one node, you also need to have ~800 threads in akka (500 might be enough
because of overhead) or else async I/O gets blocked while waiting for akka
threads to become available.

Best,

Arvid

On Mon, Jul 6, 2020 at 1:45 PM Benchao Li  wrote:

> Hi Mark,
>
> According to your data, I think the config of AsyncOperator is OK.
> There is one more config that might affect the throughput of
> AsyncOperator, it's watermark.
> Because unordered async operator still keeps the order between watermarks,
> did you use
> event time in your job, and if yes, what's the watermark interval in your
> job?
>
> Mark Zitnik  于2020年7月5日周日 下午7:44写道:
>
>> Hi Benchao
>>
>> The capacity is 100
>> Parallelism is 8
>> Rpc req is 20ms
>>
>> Thanks
>>
>>
>> On Sun, 5 Jul 2020, 6:16 Benchao Li,  wrote:
>>
>>> Hi Mark,
>>>
>>> Could you give more details about your Flink job?
>>> - the capacity of AsyncDataStream
>>> - the parallelism of AsyncDataStream operator
>>> - the time of per blocked rpc request
>>>
>>> Mark Zitnik  于2020年7月5日周日 上午3:48写道:
>>>
 Hi

 In my flink application I need to enrich data using 
 AsyncDataStream.unorderedWait
 but I am getting poor perforce at the beginning I was just working with
 http call, but I have switched to grpc, I running on 8 core node and
 getting total of 3200 events per second my service that I am using is not
 fully utilized and can produce up to 1 req/seq

 Flink job flow
 Reading from Kafka ~> some enrichment with unoderedwait ~> map ~> write
 to Kafka

 Using Akkad grpc code written in scala

 Thanks

>>>
>>>
>>> --
>>>
>>> Best,
>>> Benchao Li
>>>
>>
>
> --
>
> Best,
> Benchao Li
>


-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Logging Flink metrics

2020-07-06 Thread Manish G
Ok, got it.
I would try to do it manually.

Thanks a lot for your inputs and efforts.

With regards

On Mon, Jul 6, 2020 at 10:58 PM Chesnay Schepler  wrote:

> WSL is a bit buggy when it comes to allocating ports; it happily lets 2
> processes create sockets on the same port, except that the latter one
> doesn't do anything.
> Super annying, and I haven't found a solution to that myself yet.
>
> You'll have to configure the ports explicitly for the JM/TM, which will
> likely entail manually starting the processes and updating the
> configuration in-between, e.g.:
>
> ./bin/jobmanager.sh start
> 
> ./bin/taskmanager.sh start
>
> On 06/07/2020 19:16, Manish G wrote:
>
> Yes.
>
> On Mon, Jul 6, 2020 at 10:43 PM Chesnay Schepler 
> wrote:
>
>> Are you running Flink is WSL by chance?
>>
>> On 06/07/2020 19:06, Manish G wrote:
>>
>> In flink-conf.yaml:
>> *metrics.reporter.prom.port: 9250-9260*
>>
>> This is based on information provided here
>> 
>> *port - (optional) the port the Prometheus exporter listens on, defaults
>> to 9249
>> .
>> In order to be able to run several instances of the reporter on one host
>> (e.g. when one TaskManager is colocated with the JobManager) it is
>> advisable to use a port range like 9250-9260.*
>>
>> As I am running flink locally, so both jobmanager and taskmanager are
>> colocated.
>>
>> In prometheus.yml:
>>
>>
>>
>>
>> *- job_name: 'flinkprometheus' scrape_interval: 5s
>> static_configs:   - targets: ['localhost:9250', 'localhost:9251']
>> metrics_path: /*
>>
>> This is the whole configuration I have done based on several tutorials
>> and blogs available online.
>>
>>
>>
>>
>> On Mon, Jul 6, 2020 at 10:20 PM Chesnay Schepler 
>> wrote:
>>
>>> These are all JobManager metrics; have you configured prometheus to also
>>> scrape the task manager processes?
>>>
>>> On 06/07/2020 18:35, Manish G wrote:
>>>
>>> The metrics I see on prometheus is like:
>>>
>>> # HELP flink_jobmanager_job_lastCheckpointRestoreTimestamp 
>>> lastCheckpointRestoreTimestamp (scope: jobmanager_job)
>>> # TYPE flink_jobmanager_job_lastCheckpointRestoreTimestamp gauge
>>> flink_jobmanager_job_lastCheckpointRestoreTimestamp{job_id="58483036154d7f72ad1bbf10eb86bc2e",host="localhost",job_name="frauddetection",}
>>>  -1.0
>>> # HELP flink_jobmanager_job_numberOfFailedCheckpoints 
>>> numberOfFailedCheckpoints (scope: jobmanager_job)
>>> # TYPE flink_jobmanager_job_numberOfFailedCheckpoints gauge
>>> flink_jobmanager_job_numberOfFailedCheckpoints{job_id="58483036154d7f72ad1bbf10eb86bc2e",host="localhost",job_name="frauddetection",}
>>>  0.0
>>> # HELP flink_jobmanager_Status_JVM_Memory_Heap_Max Max (scope: 
>>> jobmanager_Status_JVM_Memory_Heap)
>>> # TYPE flink_jobmanager_Status_JVM_Memory_Heap_Max gauge
>>> flink_jobmanager_Status_JVM_Memory_Heap_Max{host="localhost",} 1.029177344E9
>>> # HELP flink_jobmanager_Status_JVM_GarbageCollector_PS_MarkSweep_Count 
>>> Count (scope: jobmanager_Status_JVM_GarbageCollector_PS_MarkSweep)
>>> # TYPE flink_jobmanager_Status_JVM_GarbageCollector_PS_MarkSweep_Count gauge
>>> flink_jobmanager_Status_JVM_GarbageCollector_PS_MarkSweep_Count{host="localhost",}
>>>  2.0
>>> # HELP flink_jobmanager_Status_JVM_CPU_Time Time (scope: 
>>> jobmanager_Status_JVM_CPU)
>>> # TYPE flink_jobmanager_Status_JVM_CPU_Time gauge
>>> flink_jobmanager_Status_JVM_CPU_Time{host="localhost",} 8.42E9
>>> # HELP flink_jobmanager_Status_JVM_Memory_Direct_TotalCapacity 
>>> TotalCapacity (scope: jobmanager_Status_JVM_Memory_Direct)
>>> # TYPE flink_jobmanager_Status_JVM_Memory_Direct_TotalCapacity gauge
>>> flink_jobmanager_Status_JVM_Memory_Direct_TotalCapacity{host="localhost",} 
>>> 604064.0
>>> # HELP flink_jobmanager_job_fullRestarts fullRestarts (scope: 
>>> jobmanager_job)
>>> # TYPE flink_jobmanager_job_fullRestarts gauge
>>> flink_jobmanager_job_fullRestarts{job_id="58483036154d7f72ad1bbf10eb86bc2e",host="localhost",job_name="frauddetection",}
>>>  0.0
>>>
>>>
>>>
>>>
>>> On Mon, Jul 6, 2020 at 9:51 PM Chesnay Schepler 
>>> wrote:
>>>
 You've said elsewhere that you do see some metrics in prometheus, which
 are those?

 Why are you configuring the host for the prometheus reporter? This
 option is only for the PrometheusPushGatewayReporter.

 On 06/07/2020 18:01, Manish G wrote:

 Hi,

 So I have following in flink-conf.yml :
 //
 metrics.reporter.prom.class:
 org.apache.flink.metrics.prometheus.PrometheusReporter
 metrics.reporter.prom.host: 127.0.0.1
 metrics.reporter.prom.port: 
 metrics.reporter.slf4j.class:
 org.apache.flink.metrics.slf4j.Slf4jReporter
 metrics.reporter.slf4j.interval: 30 SECONDS
 ///

Re: Logging Flink metrics

2020-07-06 Thread Chesnay Schepler
WSL is a bit buggy when it comes to allocating ports; it happily lets 2 
processes create sockets on the same port, except that the latter one 
doesn't do anything.

Super annying, and I haven't found a solution to that myself yet.

You'll have to configure the ports explicitly for the JM/TM, which will 
likely entail manually starting the processes and updating the 
configuration in-between, e.g.:


./bin/jobmanager.sh start

./bin/taskmanager.sh start

On 06/07/2020 19:16, Manish G wrote:

Yes.

On Mon, Jul 6, 2020 at 10:43 PM Chesnay Schepler > wrote:


Are you running Flink is WSL by chance?

On 06/07/2020 19:06, Manish G wrote:

In flink-conf.yaml:
*metrics.reporter.prom.port: 9250-9260*

This is based on information provided here


/*|port|- (optional) the port the Prometheus exporter listens on,
defaults to9249
.
In order to be able to run several instances of the reporter on
one host (e.g. when one TaskManager is colocated with the
JobManager) it is advisable to use a port range like|9250-9260|.*/
/*
*/
As I am running flink locally, so both jobmanager and taskmanager
are colocated.

In prometheus.yml:
*- job_name: 'flinkprometheus'
    scrape_interval: 5s
    static_configs:
      - targets: ['localhost:9250', 'localhost:9251']
    metrics_path: /*
*
*
This is the whole configuration I have done based on several
tutorials and blogs available online.
**


/**/


On Mon, Jul 6, 2020 at 10:20 PM Chesnay Schepler
mailto:ches...@apache.org>> wrote:

These are all JobManager metrics; have you configured
prometheus to also scrape the task manager processes?

On 06/07/2020 18:35, Manish G wrote:

The metrics I see on prometheus is like:
# HELP flink_jobmanager_job_lastCheckpointRestoreTimestamp 
lastCheckpointRestoreTimestamp (scope: jobmanager_job)
# TYPE flink_jobmanager_job_lastCheckpointRestoreTimestamp gauge

flink_jobmanager_job_lastCheckpointRestoreTimestamp{job_id="58483036154d7f72ad1bbf10eb86bc2e",host="localhost",job_name="frauddetection",}
 -1.0
# HELP flink_jobmanager_job_numberOfFailedCheckpoints 
numberOfFailedCheckpoints (scope: jobmanager_job)
# TYPE flink_jobmanager_job_numberOfFailedCheckpoints gauge

flink_jobmanager_job_numberOfFailedCheckpoints{job_id="58483036154d7f72ad1bbf10eb86bc2e",host="localhost",job_name="frauddetection",}
 0.0
# HELP flink_jobmanager_Status_JVM_Memory_Heap_Max Max (scope: 
jobmanager_Status_JVM_Memory_Heap)
# TYPE flink_jobmanager_Status_JVM_Memory_Heap_Max gauge
flink_jobmanager_Status_JVM_Memory_Heap_Max{host="localhost",} 
1.029177344E9
# HELP flink_jobmanager_Status_JVM_GarbageCollector_PS_MarkSweep_Count 
Count (scope: jobmanager_Status_JVM_GarbageCollector_PS_MarkSweep)
# TYPE flink_jobmanager_Status_JVM_GarbageCollector_PS_MarkSweep_Count 
gauge

flink_jobmanager_Status_JVM_GarbageCollector_PS_MarkSweep_Count{host="localhost",}
 2.0
# HELP flink_jobmanager_Status_JVM_CPU_Time Time (scope: 
jobmanager_Status_JVM_CPU)
# TYPE flink_jobmanager_Status_JVM_CPU_Time gauge
flink_jobmanager_Status_JVM_CPU_Time{host="localhost",} 8.42E9
# HELP flink_jobmanager_Status_JVM_Memory_Direct_TotalCapacity 
TotalCapacity (scope: jobmanager_Status_JVM_Memory_Direct)
# TYPE flink_jobmanager_Status_JVM_Memory_Direct_TotalCapacity gauge

flink_jobmanager_Status_JVM_Memory_Direct_TotalCapacity{host="localhost",} 
604064.0
# HELP flink_jobmanager_job_fullRestarts fullRestarts (scope: 
jobmanager_job)
# TYPE flink_jobmanager_job_fullRestarts gauge

flink_jobmanager_job_fullRestarts{job_id="58483036154d7f72ad1bbf10eb86bc2e",host="localhost",job_name="frauddetection",}
 0.0



On Mon, Jul 6, 2020 at 9:51 PM Chesnay Schepler
mailto:ches...@apache.org>> wrote:

You've said elsewhere that you do see some metrics in
prometheus, which are those?

Why are you configuring the host for the prometheus
reporter? This option is only for the
PrometheusPushGatewayReporter.

On 06/07/2020 18:01, Manish G wrote:

Hi,

So I have following in flink-conf.yml :
//
metrics.reporter.prom.class:
org.apache.flink.metrics.prometheus.PrometheusReporter
metrics.reporter.prom.host: 127.0.0.1
metrics.reporter.prom.port: 
metrics.reporter.slf4j.class:
org.apache.flink.metrics.slf4j.Slf4jReporter
  

Re: Logging Flink metrics

2020-07-06 Thread Manish G
Yes.

On Mon, Jul 6, 2020 at 10:43 PM Chesnay Schepler  wrote:

> Are you running Flink is WSL by chance?
>
> On 06/07/2020 19:06, Manish G wrote:
>
> In flink-conf.yaml:
> *metrics.reporter.prom.port: 9250-9260*
>
> This is based on information provided here
> 
> *port - (optional) the port the Prometheus exporter listens on, defaults
> to 9249
> .
> In order to be able to run several instances of the reporter on one host
> (e.g. when one TaskManager is colocated with the JobManager) it is
> advisable to use a port range like 9250-9260.*
>
> As I am running flink locally, so both jobmanager and taskmanager are
> colocated.
>
> In prometheus.yml:
>
>
>
>
> *- job_name: 'flinkprometheus' scrape_interval: 5s static_configs:
>   - targets: ['localhost:9250', 'localhost:9251'] metrics_path: /*
>
> This is the whole configuration I have done based on several tutorials and
> blogs available online.
>
>
>
>
> On Mon, Jul 6, 2020 at 10:20 PM Chesnay Schepler 
> wrote:
>
>> These are all JobManager metrics; have you configured prometheus to also
>> scrape the task manager processes?
>>
>> On 06/07/2020 18:35, Manish G wrote:
>>
>> The metrics I see on prometheus is like:
>>
>> # HELP flink_jobmanager_job_lastCheckpointRestoreTimestamp 
>> lastCheckpointRestoreTimestamp (scope: jobmanager_job)
>> # TYPE flink_jobmanager_job_lastCheckpointRestoreTimestamp gauge
>> flink_jobmanager_job_lastCheckpointRestoreTimestamp{job_id="58483036154d7f72ad1bbf10eb86bc2e",host="localhost",job_name="frauddetection",}
>>  -1.0
>> # HELP flink_jobmanager_job_numberOfFailedCheckpoints 
>> numberOfFailedCheckpoints (scope: jobmanager_job)
>> # TYPE flink_jobmanager_job_numberOfFailedCheckpoints gauge
>> flink_jobmanager_job_numberOfFailedCheckpoints{job_id="58483036154d7f72ad1bbf10eb86bc2e",host="localhost",job_name="frauddetection",}
>>  0.0
>> # HELP flink_jobmanager_Status_JVM_Memory_Heap_Max Max (scope: 
>> jobmanager_Status_JVM_Memory_Heap)
>> # TYPE flink_jobmanager_Status_JVM_Memory_Heap_Max gauge
>> flink_jobmanager_Status_JVM_Memory_Heap_Max{host="localhost",} 1.029177344E9
>> # HELP flink_jobmanager_Status_JVM_GarbageCollector_PS_MarkSweep_Count Count 
>> (scope: jobmanager_Status_JVM_GarbageCollector_PS_MarkSweep)
>> # TYPE flink_jobmanager_Status_JVM_GarbageCollector_PS_MarkSweep_Count gauge
>> flink_jobmanager_Status_JVM_GarbageCollector_PS_MarkSweep_Count{host="localhost",}
>>  2.0
>> # HELP flink_jobmanager_Status_JVM_CPU_Time Time (scope: 
>> jobmanager_Status_JVM_CPU)
>> # TYPE flink_jobmanager_Status_JVM_CPU_Time gauge
>> flink_jobmanager_Status_JVM_CPU_Time{host="localhost",} 8.42E9
>> # HELP flink_jobmanager_Status_JVM_Memory_Direct_TotalCapacity TotalCapacity 
>> (scope: jobmanager_Status_JVM_Memory_Direct)
>> # TYPE flink_jobmanager_Status_JVM_Memory_Direct_TotalCapacity gauge
>> flink_jobmanager_Status_JVM_Memory_Direct_TotalCapacity{host="localhost",} 
>> 604064.0
>> # HELP flink_jobmanager_job_fullRestarts fullRestarts (scope: jobmanager_job)
>> # TYPE flink_jobmanager_job_fullRestarts gauge
>> flink_jobmanager_job_fullRestarts{job_id="58483036154d7f72ad1bbf10eb86bc2e",host="localhost",job_name="frauddetection",}
>>  0.0
>>
>>
>>
>>
>> On Mon, Jul 6, 2020 at 9:51 PM Chesnay Schepler 
>> wrote:
>>
>>> You've said elsewhere that you do see some metrics in prometheus, which
>>> are those?
>>>
>>> Why are you configuring the host for the prometheus reporter? This
>>> option is only for the PrometheusPushGatewayReporter.
>>>
>>> On 06/07/2020 18:01, Manish G wrote:
>>>
>>> Hi,
>>>
>>> So I have following in flink-conf.yml :
>>> //
>>> metrics.reporter.prom.class:
>>> org.apache.flink.metrics.prometheus.PrometheusReporter
>>> metrics.reporter.prom.host: 127.0.0.1
>>> metrics.reporter.prom.port: 
>>> metrics.reporter.slf4j.class:
>>> org.apache.flink.metrics.slf4j.Slf4jReporter
>>> metrics.reporter.slf4j.interval: 30 SECONDS
>>> //
>>>
>>> And while I can see custom metrics in Taskmanager logs, but prometheus
>>> dashboard logs doesn't show custom metrics.
>>>
>>> With regards
>>>
>>> On Mon, Jul 6, 2020 at 8:55 PM Chesnay Schepler 
>>> wrote:
>>>
 You have explicitly configured a reporter list, resulting in the slf4j
 reporter being ignored:

 2020-07-06 13:48:22,191 INFO
 org.apache.flink.configuration.GlobalConfiguration- Loading
 configuration property: metrics.reporters, prom
 2020-07-06 13:48:23,203 INFO
 org.apache.flink.runtime.metrics.ReporterSetup- Excluding
 reporter slf4j, not configured in reporter list (prom).

 Note that nowadays metrics.reporters is no longer required; the set of
 reporters is

Re: Logging Flink metrics

2020-07-06 Thread Chesnay Schepler

Are you running Flink is WSL by chance?

On 06/07/2020 19:06, Manish G wrote:

In flink-conf.yaml:
*metrics.reporter.prom.port: 9250-9260*

This is based on information provided here 

/*|port|- (optional) the port the Prometheus exporter listens on, 
defaults to9249 
. 
In order to be able to run several instances of the reporter on one 
host (e.g. when one TaskManager is colocated with the JobManager) it 
is advisable to use a port range like|9250-9260|.*/

/*
*/
As I am running flink locally, so both jobmanager and taskmanager are 
colocated.


In prometheus.yml:
*- job_name: 'flinkprometheus'
    scrape_interval: 5s
    static_configs:
      - targets: ['localhost:9250', 'localhost:9251']
    metrics_path: /*
*
*
This is the whole configuration I have done based on several tutorials 
and blogs available online.

**


/**/


On Mon, Jul 6, 2020 at 10:20 PM Chesnay Schepler > wrote:


These are all JobManager metrics; have you configured prometheus
to also scrape the task manager processes?

On 06/07/2020 18:35, Manish G wrote:

The metrics I see on prometheus is like:
# HELP flink_jobmanager_job_lastCheckpointRestoreTimestamp 
lastCheckpointRestoreTimestamp (scope: jobmanager_job)
# TYPE flink_jobmanager_job_lastCheckpointRestoreTimestamp gauge

flink_jobmanager_job_lastCheckpointRestoreTimestamp{job_id="58483036154d7f72ad1bbf10eb86bc2e",host="localhost",job_name="frauddetection",}
 -1.0
# HELP flink_jobmanager_job_numberOfFailedCheckpoints 
numberOfFailedCheckpoints (scope: jobmanager_job)
# TYPE flink_jobmanager_job_numberOfFailedCheckpoints gauge

flink_jobmanager_job_numberOfFailedCheckpoints{job_id="58483036154d7f72ad1bbf10eb86bc2e",host="localhost",job_name="frauddetection",}
 0.0
# HELP flink_jobmanager_Status_JVM_Memory_Heap_Max Max (scope: 
jobmanager_Status_JVM_Memory_Heap)
# TYPE flink_jobmanager_Status_JVM_Memory_Heap_Max gauge
flink_jobmanager_Status_JVM_Memory_Heap_Max{host="localhost",} 1.029177344E9
# HELP flink_jobmanager_Status_JVM_GarbageCollector_PS_MarkSweep_Count 
Count (scope: jobmanager_Status_JVM_GarbageCollector_PS_MarkSweep)
# TYPE flink_jobmanager_Status_JVM_GarbageCollector_PS_MarkSweep_Count gauge

flink_jobmanager_Status_JVM_GarbageCollector_PS_MarkSweep_Count{host="localhost",}
 2.0
# HELP flink_jobmanager_Status_JVM_CPU_Time Time (scope: 
jobmanager_Status_JVM_CPU)
# TYPE flink_jobmanager_Status_JVM_CPU_Time gauge
flink_jobmanager_Status_JVM_CPU_Time{host="localhost",} 8.42E9
# HELP flink_jobmanager_Status_JVM_Memory_Direct_TotalCapacity 
TotalCapacity (scope: jobmanager_Status_JVM_Memory_Direct)
# TYPE flink_jobmanager_Status_JVM_Memory_Direct_TotalCapacity gauge
flink_jobmanager_Status_JVM_Memory_Direct_TotalCapacity{host="localhost",} 
604064.0
# HELP flink_jobmanager_job_fullRestarts fullRestarts (scope: 
jobmanager_job)
# TYPE flink_jobmanager_job_fullRestarts gauge

flink_jobmanager_job_fullRestarts{job_id="58483036154d7f72ad1bbf10eb86bc2e",host="localhost",job_name="frauddetection",}
 0.0



On Mon, Jul 6, 2020 at 9:51 PM Chesnay Schepler
mailto:ches...@apache.org>> wrote:

You've said elsewhere that you do see some metrics in
prometheus, which are those?

Why are you configuring the host for the prometheus reporter?
This option is only for the PrometheusPushGatewayReporter.

On 06/07/2020 18:01, Manish G wrote:

Hi,

So I have following in flink-conf.yml :
//
metrics.reporter.prom.class:
org.apache.flink.metrics.prometheus.PrometheusReporter
metrics.reporter.prom.host: 127.0.0.1
metrics.reporter.prom.port: 
metrics.reporter.slf4j.class:
org.apache.flink.metrics.slf4j.Slf4jReporter
metrics.reporter.slf4j.interval: 30 SECONDS
//

And while I can see custom metrics in Taskmanager logs, but
prometheus dashboard logs doesn't show custom metrics.

With regards

On Mon, Jul 6, 2020 at 8:55 PM Chesnay Schepler
mailto:ches...@apache.org>> wrote:

You have explicitly configured a reporter list,
resulting in the slf4j reporter being ignored:

2020-07-06 13:48:22,191 INFO
org.apache.flink.configuration.GlobalConfiguration -
Loading configuration property: metrics.reporters, prom
2020-07-06 13:48:23,203 INFO
org.apache.flink.runtime.metrics.ReporterSetup -
Excluding reporter slf4j, not configured in reporter
list (prom).

Note that nowaday

Re: Logging Flink metrics

2020-07-06 Thread Manish G
In flink-conf.yaml:
*metrics.reporter.prom.port: 9250-9260*

This is based on information provided here

*port - (optional) the port the Prometheus exporter listens on, defaults
to 9249
.
In order to be able to run several instances of the reporter on one host
(e.g. when one TaskManager is colocated with the JobManager) it is
advisable to use a port range like 9250-9260.*

As I am running flink locally, so both jobmanager and taskmanager are
colocated.

In prometheus.yml:




*- job_name: 'flinkprometheus'scrape_interval: 5sstatic_configs:
  - targets: ['localhost:9250', 'localhost:9251']metrics_path: /*

This is the whole configuration I have done based on several tutorials and
blogs available online.




On Mon, Jul 6, 2020 at 10:20 PM Chesnay Schepler  wrote:

> These are all JobManager metrics; have you configured prometheus to also
> scrape the task manager processes?
>
> On 06/07/2020 18:35, Manish G wrote:
>
> The metrics I see on prometheus is like:
>
> # HELP flink_jobmanager_job_lastCheckpointRestoreTimestamp 
> lastCheckpointRestoreTimestamp (scope: jobmanager_job)
> # TYPE flink_jobmanager_job_lastCheckpointRestoreTimestamp gauge
> flink_jobmanager_job_lastCheckpointRestoreTimestamp{job_id="58483036154d7f72ad1bbf10eb86bc2e",host="localhost",job_name="frauddetection",}
>  -1.0
> # HELP flink_jobmanager_job_numberOfFailedCheckpoints 
> numberOfFailedCheckpoints (scope: jobmanager_job)
> # TYPE flink_jobmanager_job_numberOfFailedCheckpoints gauge
> flink_jobmanager_job_numberOfFailedCheckpoints{job_id="58483036154d7f72ad1bbf10eb86bc2e",host="localhost",job_name="frauddetection",}
>  0.0
> # HELP flink_jobmanager_Status_JVM_Memory_Heap_Max Max (scope: 
> jobmanager_Status_JVM_Memory_Heap)
> # TYPE flink_jobmanager_Status_JVM_Memory_Heap_Max gauge
> flink_jobmanager_Status_JVM_Memory_Heap_Max{host="localhost",} 1.029177344E9
> # HELP flink_jobmanager_Status_JVM_GarbageCollector_PS_MarkSweep_Count Count 
> (scope: jobmanager_Status_JVM_GarbageCollector_PS_MarkSweep)
> # TYPE flink_jobmanager_Status_JVM_GarbageCollector_PS_MarkSweep_Count gauge
> flink_jobmanager_Status_JVM_GarbageCollector_PS_MarkSweep_Count{host="localhost",}
>  2.0
> # HELP flink_jobmanager_Status_JVM_CPU_Time Time (scope: 
> jobmanager_Status_JVM_CPU)
> # TYPE flink_jobmanager_Status_JVM_CPU_Time gauge
> flink_jobmanager_Status_JVM_CPU_Time{host="localhost",} 8.42E9
> # HELP flink_jobmanager_Status_JVM_Memory_Direct_TotalCapacity TotalCapacity 
> (scope: jobmanager_Status_JVM_Memory_Direct)
> # TYPE flink_jobmanager_Status_JVM_Memory_Direct_TotalCapacity gauge
> flink_jobmanager_Status_JVM_Memory_Direct_TotalCapacity{host="localhost",} 
> 604064.0
> # HELP flink_jobmanager_job_fullRestarts fullRestarts (scope: jobmanager_job)
> # TYPE flink_jobmanager_job_fullRestarts gauge
> flink_jobmanager_job_fullRestarts{job_id="58483036154d7f72ad1bbf10eb86bc2e",host="localhost",job_name="frauddetection",}
>  0.0
>
>
>
>
> On Mon, Jul 6, 2020 at 9:51 PM Chesnay Schepler 
> wrote:
>
>> You've said elsewhere that you do see some metrics in prometheus, which
>> are those?
>>
>> Why are you configuring the host for the prometheus reporter? This
>> option is only for the PrometheusPushGatewayReporter.
>>
>> On 06/07/2020 18:01, Manish G wrote:
>>
>> Hi,
>>
>> So I have following in flink-conf.yml :
>> //
>> metrics.reporter.prom.class:
>> org.apache.flink.metrics.prometheus.PrometheusReporter
>> metrics.reporter.prom.host: 127.0.0.1
>> metrics.reporter.prom.port: 
>> metrics.reporter.slf4j.class: org.apache.flink.metrics.slf4j.Slf4jReporter
>> metrics.reporter.slf4j.interval: 30 SECONDS
>> //
>>
>> And while I can see custom metrics in Taskmanager logs, but prometheus
>> dashboard logs doesn't show custom metrics.
>>
>> With regards
>>
>> On Mon, Jul 6, 2020 at 8:55 PM Chesnay Schepler 
>> wrote:
>>
>>> You have explicitly configured a reporter list, resulting in the slf4j
>>> reporter being ignored:
>>>
>>> 2020-07-06 13:48:22,191 INFO
>>> org.apache.flink.configuration.GlobalConfiguration- Loading
>>> configuration property: metrics.reporters, prom
>>> 2020-07-06 13:48:23,203 INFO
>>> org.apache.flink.runtime.metrics.ReporterSetup- Excluding
>>> reporter slf4j, not configured in reporter list (prom).
>>>
>>> Note that nowadays metrics.reporters is no longer required; the set of
>>> reporters is automatically determined based on configured properties; the
>>> only use-case is disabling a reporter without having to remove the entire
>>> configuration.
>>> I'd suggest to just remove the option, try again, and report back.
>>>
>>> On 06/07/2020 16:35, Chesnay Schepler wrote:
>>>
>>> Please

Flink Parallelism for various type of transformation

2020-07-06 Thread Prasanna kumar
Hi ,

I used t2.medium machines for the task manager nodes. It has 2 CPU and 4GB
memory.

But the task manager screen shows that there are 4 slots.

Generally we should match the number of slots to the number of cores.

[image: image.png]

Our pipeline is Source -> Simple Transform -> Sink.

What happens when we have more slots than cores in following scenarios?
1) The transform is just changing of json format.

2)  When the transformation is done by hitting another server (HTTP
request)

Thanks,
Prasanna.


Re: Logging Flink metrics

2020-07-06 Thread Chesnay Schepler
These are all JobManager metrics; have you configured prometheus to also 
scrape the task manager processes?


On 06/07/2020 18:35, Manish G wrote:

The metrics I see on prometheus is like:
# HELP flink_jobmanager_job_lastCheckpointRestoreTimestamp 
lastCheckpointRestoreTimestamp (scope: jobmanager_job)
# TYPE flink_jobmanager_job_lastCheckpointRestoreTimestamp gauge
flink_jobmanager_job_lastCheckpointRestoreTimestamp{job_id="58483036154d7f72ad1bbf10eb86bc2e",host="localhost",job_name="frauddetection",}
 -1.0
# HELP flink_jobmanager_job_numberOfFailedCheckpoints numberOfFailedCheckpoints 
(scope: jobmanager_job)
# TYPE flink_jobmanager_job_numberOfFailedCheckpoints gauge
flink_jobmanager_job_numberOfFailedCheckpoints{job_id="58483036154d7f72ad1bbf10eb86bc2e",host="localhost",job_name="frauddetection",}
 0.0
# HELP flink_jobmanager_Status_JVM_Memory_Heap_Max Max (scope: 
jobmanager_Status_JVM_Memory_Heap)
# TYPE flink_jobmanager_Status_JVM_Memory_Heap_Max gauge
flink_jobmanager_Status_JVM_Memory_Heap_Max{host="localhost",} 1.029177344E9
# HELP flink_jobmanager_Status_JVM_GarbageCollector_PS_MarkSweep_Count Count 
(scope: jobmanager_Status_JVM_GarbageCollector_PS_MarkSweep)
# TYPE flink_jobmanager_Status_JVM_GarbageCollector_PS_MarkSweep_Count gauge
flink_jobmanager_Status_JVM_GarbageCollector_PS_MarkSweep_Count{host="localhost",}
 2.0
# HELP flink_jobmanager_Status_JVM_CPU_Time Time (scope: 
jobmanager_Status_JVM_CPU)
# TYPE flink_jobmanager_Status_JVM_CPU_Time gauge
flink_jobmanager_Status_JVM_CPU_Time{host="localhost",} 8.42E9
# HELP flink_jobmanager_Status_JVM_Memory_Direct_TotalCapacity TotalCapacity 
(scope: jobmanager_Status_JVM_Memory_Direct)
# TYPE flink_jobmanager_Status_JVM_Memory_Direct_TotalCapacity gauge
flink_jobmanager_Status_JVM_Memory_Direct_TotalCapacity{host="localhost",} 
604064.0
# HELP flink_jobmanager_job_fullRestarts fullRestarts (scope: jobmanager_job)
# TYPE flink_jobmanager_job_fullRestarts gauge
flink_jobmanager_job_fullRestarts{job_id="58483036154d7f72ad1bbf10eb86bc2e",host="localhost",job_name="frauddetection",}
 0.0



On Mon, Jul 6, 2020 at 9:51 PM Chesnay Schepler > wrote:


You've said elsewhere that you do see some metrics in prometheus,
which are those?

Why are you configuring the host for the prometheus reporter? This
option is only for the PrometheusPushGatewayReporter.

On 06/07/2020 18:01, Manish G wrote:

Hi,

So I have following in flink-conf.yml :
//
metrics.reporter.prom.class:
org.apache.flink.metrics.prometheus.PrometheusReporter
metrics.reporter.prom.host: 127.0.0.1
metrics.reporter.prom.port: 
metrics.reporter.slf4j.class:
org.apache.flink.metrics.slf4j.Slf4jReporter
metrics.reporter.slf4j.interval: 30 SECONDS
//

And while I can see custom metrics in Taskmanager logs, but
prometheus dashboard logs doesn't show custom metrics.

With regards

On Mon, Jul 6, 2020 at 8:55 PM Chesnay Schepler
mailto:ches...@apache.org>> wrote:

You have explicitly configured a reporter list, resulting in
the slf4j reporter being ignored:

2020-07-06 13:48:22,191 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: metrics.reporters, prom
2020-07-06 13:48:23,203 INFO
org.apache.flink.runtime.metrics.ReporterSetup - Excluding
reporter slf4j, not configured in reporter list (prom).

Note that nowadays metrics.reporters is no longer required;
the set of reporters is automatically determined based on
configured properties; the only use-case is disabling a
reporter without having to remove the entire configuration.
I'd suggest to just remove the option, try again, and report
back.

On 06/07/2020 16:35, Chesnay Schepler wrote:

Please enable debug logging and search for warnings from the
metric groups/registry/reporter.

If you cannot find anything suspicious, you can also send
the foll log to me directly.

On 06/07/2020 16:29, Manish G wrote:

Job is an infinite streaming one, so it keeps going. Flink
configuration is as:

metrics.reporter.slf4j.class:
org.apache.flink.metrics.slf4j.Slf4jReporter
metrics.reporter.slf4j.interval: 30 SECONDS



On Mon, Jul 6, 2020 at 7:57 PM Chesnay Schepler
mailto:ches...@apache.org>> wrote:

How long did the job run for, and what is the
configured interval?


On 06/07/2020 15:51, Manish G wrote:

Hi,

Thanks for this.

I did the configuration as mentioned at the
link(changes in flink-conf.yml, copying the jar in lib
directory), and registered the Meter with metrics
group and invoked 

Re: Kafka Rate Limit in FlinkConsumer ?

2020-07-06 Thread Chen Qin
My two cents here,

- flink job already has back pressure so rate limit can be done via setting 
parallelism to proper number in some use cases. There is an open issue of 
checkpointing reliability when back pressure, community seems working on it.

- rate limit can be abused easily and cause lot of confusions. Think about a 
use case where you have two streams do a simple interval join. Unless you were 
able to rate limit both with proper value dynamiclly, you might see timestamp 
and watermark gaps keep increasing causing checkpointing failure.

So the question might be, instead of looking at rate limit of one source, how 
to slow down all sources without ever increasing time, wm gaps. It sounds 
complicated already.

with what being said, if you really want to have rate limit on your own, you 
can try following code :) It works well for us.
public class SynchronousKafkaConsumer extends FlinkKafkaConsumer {

  protected static final Logger LOG = 
LoggerFactory.getLogger(SynchronousKafkaConsumer.class);

  private final double topicRateLimit;
  private transient RateLimiter subtaskRateLimiter;

@Override
public void open(Configuration configuration) throws Exception {
  Preconditions.checkArgument(
  topicRateLimit / getRuntimeContext().getNumberOfParallelSubtasks() > 0.1,
  "subtask ratelimit should be greater than 0.1 QPS");
  subtaskRateLimiter = RateLimiter.create(
  topicRateLimit / getRuntimeContext().getNumberOfParallelSubtasks());
  super.open(configuration);
}

@Override
protected AbstractFetcher createFetcher(
SourceContext sourceContext,
Map partitionsWithOffsets,
SerializedValue> watermarksPeriodic,
SerializedValue> watermarksPunctuated,
StreamingRuntimeContext runtimeContext,
OffsetCommitMode offsetCommitMode,
MetricGroup consumerMetricGroup, boolean useMetrics)
throws Exception {

  return new KafkaFetcher(
  sourceContext,
  partitionsWithOffsets,
  watermarksPeriodic,
  watermarksPunctuated,
  runtimeContext.getProcessingTimeService(),
  runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
  runtimeContext.getUserCodeClassLoader(),
  runtimeContext.getTaskNameWithSubtasks(),
  deserializer,
  properties,
  pollTimeout,
  runtimeContext.getMetricGroup(),
  consumerMetricGroup,
  useMetrics) {
@Override
protected void emitRecord(T record,
  KafkaTopicPartitionState 
partitionState,
  long offset) throws Exception {
  subtaskRateLimiter.acquire();
  if (record == null) {
consumerMetricGroup.counter("invalidRecord").inc();
  }
  super.emitRecord(record, partitionState, offset);
}

@Override
protected void emitRecordWithTimestamp(T record,

KafkaTopicPartitionState partitionState,
   long offset, long timestamp) throws 
Exception {
  subtaskRateLimiter.acquire();
  if (record == null) {
consumerMetricGroup.counter("invalidRecord").inc();
  }
  super.emitRecordWithTimestamp(record, partitionState, offset, timestamp);
}
  };

}
Thanks,

Chen
Pinterest Data


> On Jul 6, 2020, at 7:43 AM, David Magalhães  wrote:
> 
> I've noticed that this FLINK-11501 was implemented in 
> flink-connector-kafka-0.10 [1], but it wasn't in the current version of the 
> flink-connector-kafka. There is any reason for this, and why should be the 
> best solution to implement a rate limit functionality in the current Kafka 
> consumer?
> 
> Thanks,
> David
> 
> [1] 
> https://github.com/lyft/flink/blob/release-1.11-lyft/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
>  
> 
> 
> [2] 
> https://github.com/lyft/flink/blob/release-1.11-lyft/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
>  
> 


Re: Logging Flink metrics

2020-07-06 Thread Manish G
The metrics I see on prometheus is like:

# HELP flink_jobmanager_job_lastCheckpointRestoreTimestamp
lastCheckpointRestoreTimestamp (scope: jobmanager_job)
# TYPE flink_jobmanager_job_lastCheckpointRestoreTimestamp gauge
flink_jobmanager_job_lastCheckpointRestoreTimestamp{job_id="58483036154d7f72ad1bbf10eb86bc2e",host="localhost",job_name="frauddetection",}
-1.0
# HELP flink_jobmanager_job_numberOfFailedCheckpoints
numberOfFailedCheckpoints (scope: jobmanager_job)
# TYPE flink_jobmanager_job_numberOfFailedCheckpoints gauge
flink_jobmanager_job_numberOfFailedCheckpoints{job_id="58483036154d7f72ad1bbf10eb86bc2e",host="localhost",job_name="frauddetection",}
0.0
# HELP flink_jobmanager_Status_JVM_Memory_Heap_Max Max (scope:
jobmanager_Status_JVM_Memory_Heap)
# TYPE flink_jobmanager_Status_JVM_Memory_Heap_Max gauge
flink_jobmanager_Status_JVM_Memory_Heap_Max{host="localhost",} 1.029177344E9
# HELP flink_jobmanager_Status_JVM_GarbageCollector_PS_MarkSweep_Count
Count (scope: jobmanager_Status_JVM_GarbageCollector_PS_MarkSweep)
# TYPE flink_jobmanager_Status_JVM_GarbageCollector_PS_MarkSweep_Count gauge
flink_jobmanager_Status_JVM_GarbageCollector_PS_MarkSweep_Count{host="localhost",}
2.0
# HELP flink_jobmanager_Status_JVM_CPU_Time Time (scope:
jobmanager_Status_JVM_CPU)
# TYPE flink_jobmanager_Status_JVM_CPU_Time gauge
flink_jobmanager_Status_JVM_CPU_Time{host="localhost",} 8.42E9
# HELP flink_jobmanager_Status_JVM_Memory_Direct_TotalCapacity
TotalCapacity (scope: jobmanager_Status_JVM_Memory_Direct)
# TYPE flink_jobmanager_Status_JVM_Memory_Direct_TotalCapacity gauge
flink_jobmanager_Status_JVM_Memory_Direct_TotalCapacity{host="localhost",}
604064.0
# HELP flink_jobmanager_job_fullRestarts fullRestarts (scope: jobmanager_job)
# TYPE flink_jobmanager_job_fullRestarts gauge
flink_jobmanager_job_fullRestarts{job_id="58483036154d7f72ad1bbf10eb86bc2e",host="localhost",job_name="frauddetection",}
0.0



On Mon, Jul 6, 2020 at 9:51 PM Chesnay Schepler  wrote:

> You've said elsewhere that you do see some metrics in prometheus, which
> are those?
>
> Why are you configuring the host for the prometheus reporter? This option
> is only for the PrometheusPushGatewayReporter.
>
> On 06/07/2020 18:01, Manish G wrote:
>
> Hi,
>
> So I have following in flink-conf.yml :
> //
> metrics.reporter.prom.class:
> org.apache.flink.metrics.prometheus.PrometheusReporter
> metrics.reporter.prom.host: 127.0.0.1
> metrics.reporter.prom.port: 
> metrics.reporter.slf4j.class: org.apache.flink.metrics.slf4j.Slf4jReporter
> metrics.reporter.slf4j.interval: 30 SECONDS
> //
>
> And while I can see custom metrics in Taskmanager logs, but prometheus
> dashboard logs doesn't show custom metrics.
>
> With regards
>
> On Mon, Jul 6, 2020 at 8:55 PM Chesnay Schepler 
> wrote:
>
>> You have explicitly configured a reporter list, resulting in the slf4j
>> reporter being ignored:
>>
>> 2020-07-06 13:48:22,191 INFO
>> org.apache.flink.configuration.GlobalConfiguration- Loading
>> configuration property: metrics.reporters, prom
>> 2020-07-06 13:48:23,203 INFO
>> org.apache.flink.runtime.metrics.ReporterSetup- Excluding
>> reporter slf4j, not configured in reporter list (prom).
>>
>> Note that nowadays metrics.reporters is no longer required; the set of
>> reporters is automatically determined based on configured properties; the
>> only use-case is disabling a reporter without having to remove the entire
>> configuration.
>> I'd suggest to just remove the option, try again, and report back.
>>
>> On 06/07/2020 16:35, Chesnay Schepler wrote:
>>
>> Please enable debug logging and search for warnings from the metric
>> groups/registry/reporter.
>>
>> If you cannot find anything suspicious, you can also send the foll log to
>> me directly.
>>
>> On 06/07/2020 16:29, Manish G wrote:
>>
>> Job is an infinite streaming one, so it keeps going. Flink configuration
>> is as:
>>
>> metrics.reporter.slf4j.class: org.apache.flink.metrics.slf4j.Slf4jReporter
>> metrics.reporter.slf4j.interval: 30 SECONDS
>>
>>
>>
>> On Mon, Jul 6, 2020 at 7:57 PM Chesnay Schepler 
>> wrote:
>>
>>> How long did the job run for, and what is the configured interval?
>>>
>>>
>>> On 06/07/2020 15:51, Manish G wrote:
>>>
>>> Hi,
>>>
>>> Thanks for this.
>>>
>>> I did the configuration as mentioned at the link(changes in
>>> flink-conf.yml, copying the jar in lib directory), and registered the Meter
>>> with metrics group and invoked markEvent() method in the target code. But I
>>> don't see any related logs.
>>> I am doing this all on my local computer.
>>>
>>> Anything else I need to do?
>>>
>>> With regards
>>> Manish
>>>
>>> On Mon, Jul 6, 2020 at 5:24 PM Chesnay Schepler 
>>> wrote:
>>>
 Have you looked at the SLF4J reporter?


 https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/metrics.html#slf4j-orgapacheflinkmetrics

Re: Logging Flink metrics

2020-07-06 Thread Chesnay Schepler
You've said elsewhere that you do see some metrics in prometheus, which 
are those?


Why are you configuring the host for the prometheus reporter? This 
option is only for the PrometheusPushGatewayReporter.


On 06/07/2020 18:01, Manish G wrote:

Hi,

So I have following in flink-conf.yml :
//
metrics.reporter.prom.class: 
org.apache.flink.metrics.prometheus.PrometheusReporter

metrics.reporter.prom.host: 127.0.0.1
metrics.reporter.prom.port: 
metrics.reporter.slf4j.class: org.apache.flink.metrics.slf4j.Slf4jReporter
metrics.reporter.slf4j.interval: 30 SECONDS
//

And while I can see custom metrics in Taskmanager logs, but prometheus 
dashboard logs doesn't show custom metrics.


With regards

On Mon, Jul 6, 2020 at 8:55 PM Chesnay Schepler > wrote:


You have explicitly configured a reporter list, resulting in the
slf4j reporter being ignored:

2020-07-06 13:48:22,191 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: metrics.reporters, prom
2020-07-06 13:48:23,203 INFO
org.apache.flink.runtime.metrics.ReporterSetup - Excluding
reporter slf4j, not configured in reporter list (prom).

Note that nowadays metrics.reporters is no longer required; the
set of reporters is automatically determined based on configured
properties; the only use-case is disabling a reporter without
having to remove the entire configuration.
I'd suggest to just remove the option, try again, and report back.

On 06/07/2020 16:35, Chesnay Schepler wrote:

Please enable debug logging and search for warnings from the
metric groups/registry/reporter.

If you cannot find anything suspicious, you can also send the
foll log to me directly.

On 06/07/2020 16:29, Manish G wrote:

Job is an infinite streaming one, so it keeps going. Flink
configuration is as:

metrics.reporter.slf4j.class:
org.apache.flink.metrics.slf4j.Slf4jReporter
metrics.reporter.slf4j.interval: 30 SECONDS



On Mon, Jul 6, 2020 at 7:57 PM Chesnay Schepler
mailto:ches...@apache.org>> wrote:

How long did the job run for, and what is the configured
interval?


On 06/07/2020 15:51, Manish G wrote:

Hi,

Thanks for this.

I did the configuration as mentioned at the link(changes in
flink-conf.yml, copying the jar in lib directory), and
registered the Meter with metrics group and invoked
markEvent() method in the target code. But I don't see any
related logs.
I am doing this all on my local computer.

Anything else I need to do?

With regards
Manish

On Mon, Jul 6, 2020 at 5:24 PM Chesnay Schepler
mailto:ches...@apache.org>> wrote:

Have you looked at the SLF4J reporter?


https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/metrics.html#slf4j-orgapacheflinkmetricsslf4jslf4jreporter

On 06/07/2020 13:49, Manish G wrote:
> Hi,
>
> Is it possible to log Flink metrics in application
logs apart from
> publishing it to Prometheus?
>
> With regards












Re: Logging Flink metrics

2020-07-06 Thread Manish G
Hi,

So I have following in flink-conf.yml :
//
metrics.reporter.prom.class:
org.apache.flink.metrics.prometheus.PrometheusReporter
metrics.reporter.prom.host: 127.0.0.1
metrics.reporter.prom.port: 
metrics.reporter.slf4j.class: org.apache.flink.metrics.slf4j.Slf4jReporter
metrics.reporter.slf4j.interval: 30 SECONDS
//

And while I can see custom metrics in Taskmanager logs, but prometheus
dashboard logs doesn't show custom metrics.

With regards

On Mon, Jul 6, 2020 at 8:55 PM Chesnay Schepler  wrote:

> You have explicitly configured a reporter list, resulting in the slf4j
> reporter being ignored:
>
> 2020-07-06 13:48:22,191 INFO
> org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: metrics.reporters, prom
> 2020-07-06 13:48:23,203 INFO
> org.apache.flink.runtime.metrics.ReporterSetup- Excluding
> reporter slf4j, not configured in reporter list (prom).
>
> Note that nowadays metrics.reporters is no longer required; the set of
> reporters is automatically determined based on configured properties; the
> only use-case is disabling a reporter without having to remove the entire
> configuration.
> I'd suggest to just remove the option, try again, and report back.
>
> On 06/07/2020 16:35, Chesnay Schepler wrote:
>
> Please enable debug logging and search for warnings from the metric
> groups/registry/reporter.
>
> If you cannot find anything suspicious, you can also send the foll log to
> me directly.
>
> On 06/07/2020 16:29, Manish G wrote:
>
> Job is an infinite streaming one, so it keeps going. Flink configuration
> is as:
>
> metrics.reporter.slf4j.class: org.apache.flink.metrics.slf4j.Slf4jReporter
> metrics.reporter.slf4j.interval: 30 SECONDS
>
>
>
> On Mon, Jul 6, 2020 at 7:57 PM Chesnay Schepler 
> wrote:
>
>> How long did the job run for, and what is the configured interval?
>>
>>
>> On 06/07/2020 15:51, Manish G wrote:
>>
>> Hi,
>>
>> Thanks for this.
>>
>> I did the configuration as mentioned at the link(changes in
>> flink-conf.yml, copying the jar in lib directory), and registered the Meter
>> with metrics group and invoked markEvent() method in the target code. But I
>> don't see any related logs.
>> I am doing this all on my local computer.
>>
>> Anything else I need to do?
>>
>> With regards
>> Manish
>>
>> On Mon, Jul 6, 2020 at 5:24 PM Chesnay Schepler 
>> wrote:
>>
>>> Have you looked at the SLF4J reporter?
>>>
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/metrics.html#slf4j-orgapacheflinkmetricsslf4jslf4jreporter
>>>
>>> On 06/07/2020 13:49, Manish G wrote:
>>> > Hi,
>>> >
>>> > Is it possible to log Flink metrics in application logs apart from
>>> > publishing it to Prometheus?
>>> >
>>> > With regards
>>>
>>>
>>>
>>
>
>


Re: Logging Flink metrics

2020-07-06 Thread Chesnay Schepler
You have explicitly configured a reporter list, resulting in the slf4j 
reporter being ignored:


2020-07-06 13:48:22,191 INFO 
org.apache.flink.configuration.GlobalConfiguration    - Loading 
configuration property: metrics.reporters, prom
2020-07-06 13:48:23,203 INFO 
org.apache.flink.runtime.metrics.ReporterSetup    - 
Excluding reporter slf4j, not configured in reporter list (prom).


Note that nowadays metrics.reporters is no longer required; the set of 
reporters is automatically determined based on configured properties; 
the only use-case is disabling a reporter without having to remove the 
entire configuration.

I'd suggest to just remove the option, try again, and report back.

On 06/07/2020 16:35, Chesnay Schepler wrote:
Please enable debug logging and search for warnings from the metric 
groups/registry/reporter.


If you cannot find anything suspicious, you can also send the foll log 
to me directly.


On 06/07/2020 16:29, Manish G wrote:
Job is an infinite streaming one, so it keeps going. Flink 
configuration is as:


metrics.reporter.slf4j.class: 
org.apache.flink.metrics.slf4j.Slf4jReporter

metrics.reporter.slf4j.interval: 30 SECONDS



On Mon, Jul 6, 2020 at 7:57 PM Chesnay Schepler > wrote:


How long did the job run for, and what is the configured interval?


On 06/07/2020 15:51, Manish G wrote:

Hi,

Thanks for this.

I did the configuration as mentioned at the link(changes in
flink-conf.yml, copying the jar in lib directory), and
registered the Meter with metrics group and invoked markEvent()
method in the target code. But I don't see any related logs.
I am doing this all on my local computer.

Anything else I need to do?

With regards
Manish

On Mon, Jul 6, 2020 at 5:24 PM Chesnay Schepler
mailto:ches...@apache.org>> wrote:

Have you looked at the SLF4J reporter?


https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/metrics.html#slf4j-orgapacheflinkmetricsslf4jslf4jreporter

On 06/07/2020 13:49, Manish G wrote:
> Hi,
>
> Is it possible to log Flink metrics in application logs
apart from
> publishing it to Prometheus?
>
> With regards










Kafka Rate Limit in FlinkConsumer ?

2020-07-06 Thread David Magalhães
I've noticed that this FLINK-11501 was implemented in
flink-connector-kafka-0.10 [1], but it wasn't in the current version of the
flink-connector-kafka. There is any reason for this, and why should be the
best solution to implement a rate limit functionality in the current Kafka
consumer?

Thanks,
David

[1]
https://github.com/lyft/flink/blob/release-1.11-lyft/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java

[2]
https://github.com/lyft/flink/blob/release-1.11-lyft/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java


Re: Logging Flink metrics

2020-07-06 Thread Chesnay Schepler
Please enable debug logging and search for warnings from the metric 
groups/registry/reporter.


If you cannot find anything suspicious, you can also send the foll log 
to me directly.


On 06/07/2020 16:29, Manish G wrote:
Job is an infinite streaming one, so it keeps going. Flink 
configuration is as:


metrics.reporter.slf4j.class: org.apache.flink.metrics.slf4j.Slf4jReporter
metrics.reporter.slf4j.interval: 30 SECONDS



On Mon, Jul 6, 2020 at 7:57 PM Chesnay Schepler > wrote:


How long did the job run for, and what is the configured interval?


On 06/07/2020 15:51, Manish G wrote:

Hi,

Thanks for this.

I did the configuration as mentioned at the link(changes in
flink-conf.yml, copying the jar in lib directory), and registered
the Meter with metrics group and invoked markEvent() method in
the target code. But I don't see any related logs.
I am doing this all on my local computer.

Anything else I need to do?

With regards
Manish

On Mon, Jul 6, 2020 at 5:24 PM Chesnay Schepler
mailto:ches...@apache.org>> wrote:

Have you looked at the SLF4J reporter?


https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/metrics.html#slf4j-orgapacheflinkmetricsslf4jslf4jreporter

On 06/07/2020 13:49, Manish G wrote:
> Hi,
>
> Is it possible to log Flink metrics in application logs
apart from
> publishing it to Prometheus?
>
> With regards








Re: Logging Flink metrics

2020-07-06 Thread Manish G
Job is an infinite streaming one, so it keeps going. Flink configuration is
as:

metrics.reporter.slf4j.class: org.apache.flink.metrics.slf4j.Slf4jReporter
metrics.reporter.slf4j.interval: 30 SECONDS



On Mon, Jul 6, 2020 at 7:57 PM Chesnay Schepler  wrote:

> How long did the job run for, and what is the configured interval?
>
>
> On 06/07/2020 15:51, Manish G wrote:
>
> Hi,
>
> Thanks for this.
>
> I did the configuration as mentioned at the link(changes in
> flink-conf.yml, copying the jar in lib directory), and registered the Meter
> with metrics group and invoked markEvent() method in the target code. But I
> don't see any related logs.
> I am doing this all on my local computer.
>
> Anything else I need to do?
>
> With regards
> Manish
>
> On Mon, Jul 6, 2020 at 5:24 PM Chesnay Schepler 
> wrote:
>
>> Have you looked at the SLF4J reporter?
>>
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/metrics.html#slf4j-orgapacheflinkmetricsslf4jslf4jreporter
>>
>> On 06/07/2020 13:49, Manish G wrote:
>> > Hi,
>> >
>> > Is it possible to log Flink metrics in application logs apart from
>> > publishing it to Prometheus?
>> >
>> > With regards
>>
>>
>>
>


Re: Logging Flink metrics

2020-07-06 Thread Chesnay Schepler

How long did the job run for, and what is the configured interval?


On 06/07/2020 15:51, Manish G wrote:

Hi,

Thanks for this.

I did the configuration as mentioned at the link(changes in 
flink-conf.yml, copying the jar in lib directory), and registered the 
Meter with metrics group and invoked markEvent() method in the target 
code. But I don't see any related logs.

I am doing this all on my local computer.

Anything else I need to do?

With regards
Manish

On Mon, Jul 6, 2020 at 5:24 PM Chesnay Schepler > wrote:


Have you looked at the SLF4J reporter?


https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/metrics.html#slf4j-orgapacheflinkmetricsslf4jslf4jreporter

On 06/07/2020 13:49, Manish G wrote:
> Hi,
>
> Is it possible to log Flink metrics in application logs apart from
> publishing it to Prometheus?
>
> With regards






Re: Logging Flink metrics

2020-07-06 Thread Manish G
Hi,

Thanks for this.

I did the configuration as mentioned at the link(changes in flink-conf.yml,
copying the jar in lib directory), and registered the Meter with metrics
group and invoked markEvent() method in the target code. But I don't see
any related logs.
I am doing this all on my local computer.

Anything else I need to do?

With regards
Manish

On Mon, Jul 6, 2020 at 5:24 PM Chesnay Schepler  wrote:

> Have you looked at the SLF4J reporter?
>
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/metrics.html#slf4j-orgapacheflinkmetricsslf4jslf4jreporter
>
> On 06/07/2020 13:49, Manish G wrote:
> > Hi,
> >
> > Is it possible to log Flink metrics in application logs apart from
> > publishing it to Prometheus?
> >
> > With regards
>
>
>


Re: Stateful Functions: Deploying to existing Cluster

2020-07-06 Thread Jan Brusch

Hi Igal,

thanks for the quick reply. That does make sense and I will give it a try.

It might probably make sense to add that to the Documentation.


Best regards and thanks!

Jan

On 06.07.20 14:02, Igal Shilman wrote:

Hi Jan,

Stateful functions would look at the java class path for the module.yaml,
So one way would be including the module.yaml in your 
src/main/resources/ directory.


Good luck,
Igal.


On Mon, Jul 6, 2020 at 12:39 PM Jan Brusch > wrote:


Hi,

quick question about Deploying a Flink Stateful Functions
Application to
an existing cluster: The Documentation says to integrate
"statefun-flink-distribution" as additional maven Dependency in
the fat
jar.

(https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.1/deployment-and-operations/packaging.html#flink-jar)

But how and where do I upload my module.yml for external function
definitions in that scenario...?


Best regards

Jan


--
neuland  – Büro für Informatik GmbH
Konsul-Smidt-Str. 8g, 28217 Bremen

Telefon (0421) 380107 57
Fax (0421) 380107 99
https://www.neuland-bfi.de

https://twitter.com/neuland
https://facebook.com/neulandbfi
https://xing.com/company/neulandbfi


Geschäftsführer: Thomas Gebauer, Jan Zander
Registergericht: Amtsgericht Bremen, HRB 23395 HB
USt-ID. DE 246585501



Re: Stateful Functions: Routing to remote functions

2020-07-06 Thread Jan Brusch

Hi igal,

thanks for your comprehensive reply!

As for 1. I will try and create a minimal reproduction of the case and 
share the code with you. It might be a few days until I get around to do it.


As for 2. I will definitely give this a try. From the looks of it this 
seems to be the solution and this was the error in my thinking: Sending 
unwrapped messages to external functions...



Best regards and many thanks!

Jan

On 06.07.20 14:11, Igal Shilman wrote:

Hi Jan,

Two followup questions:

1. Looking at the stack trace provided in your email, it does seem 
like the function type is unavailable, and I'd like to follow up on 
that: can you please share your Dockerfile, so
we have the complete picture. If you are not comfortable sharing that, 
then you can please try to execute into the container 
and manually validate that the module.yaml is present
both on the "worker" image and the "master" image, and it defines the 
remote function name correctly?


2. In your original email, the provided router does not route messages 
of type Any, but it actually
forwards them as-in, the remote functions API requires that the 
message being sent to the remote function

is of type Any.  Can you try something like this:

final class EventRouter implements Router {


 static final FunctionType PYTHON_EVENT_COUNTER_TYPE = new
FunctionType("demo", "eventCounterPython");
 static final FunctionType JAVA_EVENT_COUNTER_TYPE = new
FunctionType("demo", "eventCounterJava");
 @Override
 public void route(com.google.protobuf.Message event,
Downstream downstream) {

downstream.forward(
 JAVA_EVENT_COUNTER_TYPE,
 "count",
 event)
 ;
 downstream.forward(
 new Address(
 PYTHON_EVENT_COUNTER_TYPE,
 "count"
 ),
Any.pack(event)
 );
 }
}



In addition you would have to change the definition of your ingress 
identifier to have a produced type of com.google.protobuf.Message

instead of an Event.


Good luck!
Igal

On Fri, Jul 3, 2020 at 10:09 PM Jan Brusch > wrote:


Hi Igal,

thanks for your reply. Initially I thought the same thing, but it
turns out I am able to call the remote function from an embedded
"wrapper" function using the exact same setup (Relevant Code
below). So that's one kind of solution to that Problem. But to me
it seems like it's a bit of a hack and not the idiomatic way to
solve this...

From my understanding of the address based communication within
Flink Stateful Functions, I feel like it should be possible to
call that function from the router directly. But I am probably
either using the Router wrong or misunderstand some of the ideas
behind address based communication...


EventRouter.java




final class EventRouter implements Router {

  @Override
  public void route(Event event, Downstream downstream) {
    downstream.forward(EventCounterWrapper.TYPE, "_", event);
  }
}


--


EventCounterWrapper.java


---

public class EventCounterWrapper implements StatefulFunction {

    static final FunctionType TYPE = new FunctionType("demo",
"eventCounterWrapper");
    public static final FunctionType REMOTE_FUNCTION_TYPE = new
FunctionType("demo/external", "eventCounterPython");

    @Override
    public void invoke(Context context, Object input) {
    if (input instanceof Event) {
    Event event = (Event) input;
    Any message = Any.pack(event);
    context.send(REMOTE_FUNCTION_TYPE, "_", message);
    }

    if (input instanceof Any) {
    final EventCount eventCount;
    try {
    eventCount = ((Any) input).unpack(EventCount.class);
    } catch (InvalidProtocolBufferException e) {
    throw new RuntimeException("Unexpected type", e);
    }
context.send(SessionIO.EVENT_COUNT_EGRESS_ID, eventCount);
    }
    }
}


---


worker.py

@functions.bind("demo/external/eventCounterPython")
def handle_event(context, _):
 state = context.state('count').unpack(EventCount)
 if not state:
 state = EventCount()
 state.count = 1
 else:
 state.count += 1
 context.state('count').pack(state)

    envelope 

Re: can't exectue query when table type is datagen

2020-07-06 Thread Danny Chan
Dear xin Destiny ~

It seems that you use the legacy planner so the exception throws [1] ~

I agree that there needs a prompt here to indicate that it is a legacy planner, 
have fired an issue [2],
Actually for legacy, it is a regression because before the change, the computed 
column is supported well.

[1] 
https://github.com/apache/flink/blob/1b1c343e8964c6400c7c1de3c70212522ba59a64/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/ParserImpl.java#L86
[2] https://issues.apache.org/jira/browse/FLINK-18500

Best,
Danny Chan
在 2020年7月5日 +0800 AM10:52,xin Destiny ,写道:
> Hi, all:
> i use zeppelin execute sql, FLink version is Flink 1.11 snapshot ,build from 
> branch release-1.11 ,commit is 334f35cbd6da754d8b5b294032cd84c858b1f973
> when the table type is datagen, Flink will thrown exception ,but the 
> exception message is null ;
>
> My DDL is :
> CREATE TABLE datagen_dijie2 (
>  f_sequence INT,
>  f_random INT,
>  f_random_str STRING,
>  ts AS localtimestamp,
>  WATERMARK FOR ts AS ts
> ) WITH (
>  'connector' = 'datagen',
>  'rows-per-second'='5',
>  'fields.f_sequence.kind'='sequence',
>  'fields.f_sequence.start'='1',
>  'fields.f_sequence.end'='1000',
>  'fields.f_random.min'='1',
>  'fields.f_random.max'='1000',
>  'fields.f_random_str.length'='10'
> );
>
> My query sql is :
> select * from datagen_dijie2;
> the exception is :
> Fail to run sql command: select * from datagen_dijie2 
> org.apache.flink.table.api.ValidationException: SQL validation failed. null 
> at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
>  at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108)
>  at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:187)
>  at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
>  at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:658)
>  at 
> org.apache.zeppelin.flink.sql.AbstractStreamSqlJob.run(AbstractStreamSqlJob.java:102)
>  at 
> org.apache.zeppelin.flink.FlinkStreamSqlInterpreter.callInnerSelect(FlinkStreamSqlInterpreter.java:89)
>  at 
> org.apache.zeppelin.flink.FlinkSqlInterrpeter.callSelect(FlinkSqlInterrpeter.java:526)
>  at 
> org.apache.zeppelin.flink.FlinkSqlInterrpeter.callCommand(FlinkSqlInterrpeter.java:297)
>  at 
> org.apache.zeppelin.flink.FlinkSqlInterrpeter.runSqlList(FlinkSqlInterrpeter.java:191)
>  at 
> org.apache.zeppelin.flink.FlinkSqlInterrpeter.interpret(FlinkSqlInterrpeter.java:156)
>  at 
> org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:110)
>  at 
> org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:776)
>  at 
> org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:668)
>  at org.apache.zeppelin.scheduler.Job.run(Job.java:172) at 
> org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:130)
>  at 
> org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:39)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748) Caused by: 
> java.lang.UnsupportedOperationException at 
> org.apache.flink.table.planner.ParserImpl.parseSqlExpression(ParserImpl.java:86)
>  at 
> org.apache.flink.table.api.internal.CatalogTableSchemaResolver.resolveExpressionDataType(CatalogTableSchemaResolver.java:119)
>  at 
> org.apache.flink.table.api.internal.CatalogTableSchemaResolver.resolve(CatalogTableSchemaResolver.java:83)
>  at 
> org.apache.flink.table.catalog.CatalogManager.resolveTableSchema(CatalogManager.java:380)
>  at 
> org.apache.flink.table.catalog.CatalogManager.getPermanentTable(CatalogManager.java:408)
>  at 
> org.apache.flink.table.catalog.CatalogManager.getTable(CatalogManager.java:375)
>  at 
> org.apache.flink.table.planner.catalog.DatabaseCalciteSchema.getTable(DatabaseCalciteSchema.java:75)
>  at 
> org.apache.calcite.jdbc.SimpleCalciteSchema.getImplicitTable(SimpleCalciteSchema.java:83)
>  at org.apache.calcite.jdbc.CalciteSchema.getTable(CalciteSchema.java:289) at 
> org.apache.calcite.sql.validate.EmptyScope.resolve_(EmptyScope.java:143) at 
> org.apache.calcite.sql.validate.EmptyScope.resolveTable(EmptyScope.java:99) 
> at 
> org.apache.calcite.sql.validate.DelegatingScope.resolveTable(DelegatingScope.java:203)
>  at 
> org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:112)
>  at 
> org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:184)
>  at 
> org.apache.calcite.sql.validate.AbstractNamespace.

Re: Stateful Functions: Routing to remote functions

2020-07-06 Thread Igal Shilman
Hi Jan,

Two followup questions:

1. Looking at the stack trace provided in your email, it does seem like the
function type is unavailable, and I'd like to follow up on that: can you
please share your Dockerfile, so
we have the complete picture. If you are not comfortable sharing that, then
you can please try to execute into the container and manually validate that
the module.yaml is present
both on the "worker" image and the "master" image, and it defines the
remote function name correctly?

2. In your original email, the provided router does not route messages of
type Any, but it actually
forwards them as-in, the remote functions API requires that the message
being sent to the remote function
is of type Any.  Can you try something like this:

final class EventRouter implements Router {

>
>  static final FunctionType PYTHON_EVENT_COUNTER_TYPE = new
> FunctionType("demo", "eventCounterPython");
>  static final FunctionType JAVA_EVENT_COUNTER_TYPE = new
> FunctionType("demo", "eventCounterJava");
>  @Override
>  public void route(com.google.protobuf.Message event, Downstream<
> com.google.protobuf.Message> downstream) {
>
 downstream.forward(
>  JAVA_EVENT_COUNTER_TYPE,
>  "count",
>  event)
>  ;
>  downstream.forward(
>  new Address(
>  PYTHON_EVENT_COUNTER_TYPE,
>  "count"
>  ),
>  Any.pack(event)
>  );
>  }
> }



In addition you would have to change the definition of your ingress
identifier to have a produced type of com.google.protobuf.Message
instead of an Event.


Good luck!
Igal

On Fri, Jul 3, 2020 at 10:09 PM Jan Brusch 
wrote:

> Hi Igal,
>
> thanks for your reply. Initially I thought the same thing, but it turns
> out I am able to call the remote function from an embedded "wrapper"
> function using the exact same setup (Relevant Code below). So that's one
> kind of solution to that Problem. But to me it seems like it's a bit of a
> hack and not the idiomatic way to solve this...
>
> From my understanding of the address based communication within Flink
> Stateful Functions, I feel like it should be possible to call that function
> from the router directly. But I am probably either using the Router wrong
> or misunderstand some of the ideas behind address based communication...
>
>
> EventRouter.java
>
>
> 
>
> final class EventRouter implements Router {
>
>   @Override
>   public void route(Event event, Downstream downstream) {
> downstream.forward(EventCounterWrapper.TYPE, "_", event);
>   }
> }
>
>
> --
>
>
> EventCounterWrapper.java
>
>
> ---
>
> public class EventCounterWrapper implements StatefulFunction {
>
> static final FunctionType TYPE = new FunctionType("demo",
> "eventCounterWrapper");
> public static final FunctionType REMOTE_FUNCTION_TYPE = new
> FunctionType("demo/external", "eventCounterPython");
>
> @Override
> public void invoke(Context context, Object input) {
> if (input instanceof Event) {
> Event event = (Event) input;
> Any message = Any.pack(event);
> context.send(REMOTE_FUNCTION_TYPE, "_", message);
> }
>
> if (input instanceof Any) {
> final EventCount eventCount;
> try {
> eventCount = ((Any) input).unpack(EventCount.class);
> } catch (InvalidProtocolBufferException e) {
> throw new RuntimeException("Unexpected type", e);
> }
> context.send(SessionIO.EVENT_COUNT_EGRESS_ID, eventCount);
> }
> }
> }
>
>
> ---
>
>
> worker.py
> 
> @functions.bind("demo/external/eventCounterPython")
> def handle_event(context, _):
>  state = context.state('count').unpack(EventCount)
>  if not state:
>  state = EventCount()
>  state.count = 1
>  else:
>  state.count += 1
>  context.state('count').pack(state)
>
>
> envelope = Any()
> envelope.Pack(state)
> context.reply(envelope)
> 
>
>
> module.yaml
>
> -
>
> spec:
> functions:
>   - function:
>   meta:
> kind: http
> type: demo/external/eventCounterPython
>   spec:
> endpoint: http://python-worker:8000/statefun
> states:
>   - count
>
> -
>
>
> Best Regards
>
> Jan
>
>
> On 03.07.20 17:33, Igal S

Re: Flink Kafka connector in Python

2020-07-06 Thread Manas Kale
I also tried doing this by using a User Defined Function.

class DataConverter(ScalarFunction):
def eval(self, str_data):
data = json.loads(str_data)
return ?? # I want to return data['0001'] in field
'feature1', data['0002'] in field 'feature2' etc.

t_env.register_function("data_converter", udf(DataConverter(),
input_types = [DataTypes.STRING()],
  result_type =
  DataTypes.ROW([

DataTypes.FIELD("feature1", DataTypes.STRING())
  ])))


t_env.from_path(INPUT_TABLE) \
.select("data_converter(data)") \ # <--- here "data" is the field
"data" from the previous mail
.insert_into(OUTPUT_TABLE)


I used a ROW to hold multiple values but I can't figure out how I can
return a populated ROW object from the eval() method. Where is the method
to construct a row/field object and return it?


Thanks!


On Fri, Jul 3, 2020 at 12:40 PM Manas Kale  wrote:

> Hi Xingbo,
> Thanks for the reply, I didn't know that a table schema also needs to be
> declared after the connect or but I understand now.
> I have another question: how do I write the parsing schemas for a field
> that itself is a valid JSON string? For example:
> {
> "monitorId": 865,
> "deviceId": "94:54:93:49:96:13",
> "data":
> "{\"0001\":105.0,\"0002\":1.21,\"0003\":0.69,\"0004\":1.46,\"0005\":47.43,\"0006\":103.3}",
> "state": 2,
> "time": 1593687809180
> }
> The field "data" is a string of valid JSON with string:number objects. I'm
> currently trying using JSON schema object and DataTypes.ROW, but am getting
> deserialization errors.
>
> .with_format(
> Json()
> .json_schema(
> """
> {
> "type": "object",
> "properties": {
> "monitorId": {
> "type": "string"
> },
> "deviceId": {
> "type": "string"
> },
> "data": {
> "type": "object"
> },
> "state": {
> "type": "integer"
> },
> "time": {
> "type": "string"
> }
> }
> }
> """
> )
> ) \
> .with_schema(
> Schema()
> .field("monitorId", DataTypes.STRING())
> .field("deviceId", DataTypes.STRING())
> .field("data", DataTypes.ROW())
> )
>
> Regards,
>
> Manas
>
>
> On Thu, Jul 2, 2020 at 6:25 PM Xingbo Huang  wrote:
>
>> Hi, Manas
>> You need to define the schema. You can refer to the following example:
>>  t_env.connect(
>> Kafka()
>> .version('0.11')
>> .topic(INPUT_TOPIC)
>> .property("bootstrap.servers", PROD_KAFKA)
>> .property("zookeeper.connect", "localhost:2181")
>> .start_from_latest()
>> ) \
>> .with_format(
>> Json()
>> .json_schema(
>> "{"
>> "  type: 'object',"
>> "  properties: {"
>> "lon: {"
>> "  type: 'number'"
>> "},"
>> "rideTime: {"
>> "  type: 'string',"
>> "  format: 'date-time'"
>> "}"
>> "  }"
>> "}"
>> )
>> ) \
>> .with_schema(  # declare the schema of the table
>> Schema()
>> .field("lon", DataTypes.DECIMAL(20, 10))
>> .field("rideTime", DataTypes.TIMESTAMP(6))
>> ).register_table_source(INPUT_TABLE)
>>
>> Best,
>> Xingbo
>>
>> Manas Kale  于2020年7月2日周四 下午7:59写道:
>>
>>> Hi,
>>> I'm trying to get a simple consumer/producer running using the following
>>> code referred from the provided links :
>>>
>>> from pyflink.dataset import ExecutionEnvironment
>>> from pyflink.datastream import StreamExecutionEnvironment
>>> from pyflink.table import TableConfig, BatchTableEnvironment, DataTypes, 
>>> StreamTableEnvironment
>>> from pyflink.table.descriptors import Kafka, Json, FileSystem, Schema
>>>
>>> exec_env = StreamExecutionEnvironment.get_execution_environment()
>>>
>>> t_config = TableConfig()
>>> t_env = StreamTableEnvironment.create(exec_env, t_config)
>>>
>>> INPUT_TOPIC = 'xyz'
>>> INPUT_TABLE = 'raw_message'
>>> PROD_ZOOKEEPER = '...'
>>> PROD_KAFKA = '...'
>>>
>>> OUTPUT_TOPIC = 'summary_output'
>>> OUTPUT_TABLE = 'feature_summary'
>>> LOCAL_ZOOKEEPER = 'localhost:2181'
>>> LOCAL_KAFKA = 'localhost:9092'
>>>
>>>
>>> t_env.connect(
>>> Kafka()
>>> .version('universal')
>>> .topic(INPUT_TOPIC)
>>> .property("bootstrap.servers", PROD_KAFKA)
>>>
>>> .start_from_latest()
>>> ) \
>>> .with_format(
>>> Json()
>>> .json_schema(
>>> "{"
>>> "  type: 'object',"
>>> "  properties: {"
>>> "lon: {"
>>> "  type: 'number'"
>>> "},"
>>> "rideTime: {"
>>> "  type: 'string',"
>>> "  format: 'date-time'"
>>> "}"
>>> "  }"
>>> "}"
>>> )
>>> ).register_table_source(INPUT_TABLE)
>>>
>>> t_env.connect(Kafka()
>>> .version('universal')
>>> .

Re: Stateful Functions: Deploying to existing Cluster

2020-07-06 Thread Igal Shilman
Hi Jan,

Stateful functions would look at the java class path for the module.yaml,
So one way would be including the module.yaml in your src/main/resources/
directory.

Good luck,
Igal.


On Mon, Jul 6, 2020 at 12:39 PM Jan Brusch 
wrote:

> Hi,
>
> quick question about Deploying a Flink Stateful Functions Application to
> an existing cluster: The Documentation says to integrate
> "statefun-flink-distribution" as additional maven Dependency in the fat
> jar.
> (
> https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.1/deployment-and-operations/packaging.html#flink-jar
> )
>
> But how and where do I upload my module.yml for external function
> definitions in that scenario...?
>
>
> Best regards
>
> Jan
>
>


Re: Logging Flink metrics

2020-07-06 Thread Chesnay Schepler

Have you looked at the SLF4J reporter?

https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/metrics.html#slf4j-orgapacheflinkmetricsslf4jslf4jreporter

On 06/07/2020 13:49, Manish G wrote:

Hi,

Is it possible to log Flink metrics in application logs apart from 
publishing it to Prometheus?


With regards





Logging Flink metrics

2020-07-06 Thread Manish G
Hi,

Is it possible to log Flink metrics in application logs apart from
publishing it to Prometheus?

With regards


Re: Asynchronous I/O poor performance

2020-07-06 Thread Benchao Li
Hi Mark,

According to your data, I think the config of AsyncOperator is OK.
There is one more config that might affect the throughput of AsyncOperator,
it's watermark.
Because unordered async operator still keeps the order between watermarks,
did you use
event time in your job, and if yes, what's the watermark interval in your
job?

Mark Zitnik  于2020年7月5日周日 下午7:44写道:

> Hi Benchao
>
> The capacity is 100
> Parallelism is 8
> Rpc req is 20ms
>
> Thanks
>
>
> On Sun, 5 Jul 2020, 6:16 Benchao Li,  wrote:
>
>> Hi Mark,
>>
>> Could you give more details about your Flink job?
>> - the capacity of AsyncDataStream
>> - the parallelism of AsyncDataStream operator
>> - the time of per blocked rpc request
>>
>> Mark Zitnik  于2020年7月5日周日 上午3:48写道:
>>
>>> Hi
>>>
>>> In my flink application I need to enrich data using 
>>> AsyncDataStream.unorderedWait
>>> but I am getting poor perforce at the beginning I was just working with
>>> http call, but I have switched to grpc, I running on 8 core node and
>>> getting total of 3200 events per second my service that I am using is not
>>> fully utilized and can produce up to 1 req/seq
>>>
>>> Flink job flow
>>> Reading from Kafka ~> some enrichment with unoderedwait ~> map ~> write
>>> to Kafka
>>>
>>> Using Akkad grpc code written in scala
>>>
>>> Thanks
>>>
>>
>>
>> --
>>
>> Best,
>> Benchao Li
>>
>

-- 

Best,
Benchao Li


Re: Timeout when using RockDB to handle large state in a stream app

2020-07-06 Thread Felipe Gutierrez
Hi all,

I tested the two TPC-H query 03 [1] and 10 [2] using Datastream API on
the cluster with RocksDB state backend. One thing that I did that
improved a lot was to replace the List POJO to a
List>. Then I could load a table of 200MB in memory as my
state. However, the original table is 725MB, and turned out that I
need another configuration. I am not sure what I can do more to reduce
the size of my state. If one of you have an idea I am thankful to
hear.

Now, speaking about the flink-conf.yaml file and the RocksDB
configuration. When I use these configurations on the flink-conf.yaml
the stream job still runs out of memory.
jobmanager.heap.size: 4g # default: 2048m
heartbeat.timeout: 10
taskmanager.memory.process.size: 2g # default: 1728m

Then I changed for this configuration which I can set
programmatically. The stream job seems to behave better. It starts to
process something, then the metrics disappear for some time and appear
again. The available and used memory on the TM
(flink_taskmanager_Status_JVM_Memory_Direct_MemoryUsed) is 167MB. And
the available and used memory on the JM
(flink_jobmanager_Status_JVM_Memory_Direct_MemoryUsed) is 610KB. I
guess the PredefinedOptions.SPINNING_DISK_OPTIMIZED configuration is
overwriting the configuration on the flink-conf.yaml file.

RocksDBStateBackend stateBackend = new RocksDBStateBackend(stateDir, true);
stateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);
env.setStateBackend(stateBackend);

How can I increase the memory of the JM and TM when I am still using
the PredefinedOptions.SPINNING_DISK_OPTIMIZED for RocksDB?

[1] 
https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/tpch/TPCHQuery03.java
[2] 
https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/tpch/TPCHQuery10.java

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com

On Fri, Jul 3, 2020 at 9:01 AM Felipe Gutierrez
 wrote:
>
> yes. I agree. because RocsDB will spill data to disk if there is not
> enough space in memory.
> Thanks
> --
> -- Felipe Gutierrez
> -- skype: felipe.o.gutierrez
> -- https://felipeogutierrez.blogspot.com
>
> On Fri, Jul 3, 2020 at 8:27 AM Yun Tang  wrote:
> >
> > Hi Felipe,
> >
> > I noticed my previous mail has a typo: RocksDB is executed in task main 
> > thread which does not take the role to respond to heart beat. Sorry for 
> > previous typo, and the key point I want to clarify is that RocksDB should 
> > not have business for heartbeat problem.
> >
> > Best
> > Yun Tang
> > 
> > From: Felipe Gutierrez 
> > Sent: Tuesday, June 30, 2020 17:46
> > To: Yun Tang 
> > Cc: Ori Popowski ; user 
> > Subject: Re: Timeout when using RockDB to handle large state in a stream app
> >
> > Hi,
> >
> > I reduced the size of the tables that I am loading on a ListState and
> > the query worked. One of them was about 700MB [1] [2].
> >
> > Now I am gonna deploy it on the cluster and check if it works. I will
> > probably need to increase the heartbeat timeout.
> >
> > Thanks,
> > Felipe
> > [1] 
> > https://github.com/apache/flink/blob/master/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java
> > [2] 
> > https://github.com/apache/flink/blob/master/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery10.java
> > --
> > -- Felipe Gutierrez
> > -- skype: felipe.o.gutierrez
> > -- https://felipeogutierrez.blogspot.com
> >
> > On Tue, Jun 30, 2020 at 10:51 AM Yun Tang  wrote:
> > >
> > > Hi Felipe
> > >
> > > RocksDB is executed in task main thread which does take the role to 
> > > respond to heart beat and RocksDB mainly use native memory which is 
> > > decoupled from JVM heap to not bring any GC pressure. Thus, timeout 
> > > should have no relationship with RocksDB in general if your task manager 
> > > is really heartbeat timeout instead of crash to exit.
> > >
> > > Try to increase the heartbeat timeout [1] and watch the GC detail logs to 
> > > see anything weird.
> > >
> > > [1] 
> > > https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#heartbeat-timeout
> > >
> > > Best
> > > Yun Tang
> > >
> > > 
> > > From: Ori Popowski 
> > > Sent: Monday, June 29, 2020 17:44
> > > Cc: user 
> > > Subject: Re: Timeout when using RockDB to handle large state in a stream 
> > > app
> > >
> > > Hi there,
> > >
> > > I'm currently experiencing the exact same issue.
> > >
> > > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Heartbeat-of-TaskManager-timed-out-td36228.html
> > >
> > > I've found out that GC is causing the problem, but I still haven't 
> > > managed to solve this.
> > >
> > >
> > >
> > > On Mon, Jun 29, 2020 at 12:39 PM Felipe Gutierrez 
> > >  wrote:
> > >
> > > Hi community,
> > >
> > > I am trying to r

[ANNOUNCE] Weekly Community Update 2020/26-27

2020-07-06 Thread Konstantin Knauf
Dear community,

happy to share this (and last) week's community update. Flink 1.11 is
finally ready to be released. Besides that a few design discussions in
different areas of Apache Flink, like enhanced fan out for Flink's Kinesis
Source or Temporal Table support in pure Flink SQL, and of course a bit
more.

Flink Development
==

* [releases] Zhinjiang has published release candidate #4 for Flink 1.11.0
last Tuesday. The vote [1] passed this morning, so we will see the release
of Flink 1.11 very soon.

* [sql] A while ago I started a discussion on supporting Temporal Table
Joins via pure Flink SQL. As of now, a user either needs to register a
Temporal Table Function in the Table API or the environments configuration
of the SQL CLI. This became a more involved discussion than anticipated
that Leonard Xu is doing a great job in moving forward. It seems that we
are close to a FLIP document now. [2]

* [connectors] Danny Cranmer has started the discussion [3] and -
subsequently - the vote [4] on FLIP-128, which adds support for enhanced
fan out for Flink's Kinesis source. With enhanced fan out each consumer
receives dedicated data output per shard, as opposed to competing for the
per-shared data output with other consumers.

* [apis] Aljoscha has started a discussion about what kind of compatibility
guarantees the community would like to give for the APIs that are commonly
used by packaged, third-party or custom connectors. Not too much feedback
so far, but right now it seems that we would like it to be safe to use
connectors across patch releases (1.x.y -> 1.x.z), but not across minor
releases (1.u -> 1.v). Based on the recent discussions [5] on the
guarantees for @PublicEvolving this means that connectors could only use
APIs that are annotated @Public or @PublicEvolving. [6]

* [state] Etienne Chauchot has published a design document for FLINK-17073,
which introduces a backpressure mechanism for checkpoints: when checkpoints
can not be cleaned up as quickly as they are created triggering new
checkpoints will be delayed. This change was motivated by an OOME on the
Jobmanger resulting from too many queued checkpoint clean up tasks. [7]

[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Release-1-11-0-release-candidate-4-tp42829.html
[2]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLINK-16824-Creating-Temporal-Table-Function-via-DDL-tp40333.html
[3]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-128-Enhanced-Fan-Out-for-AWS-Kinesis-Consumers-tp42728.html
[4]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-128-Enhanced-Fan-Out-for-AWS-Kinesis-Consumers-tp42846.html
[5]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Cross-version-compatibility-guarantees-of-Flink-Modules-Jars-tp42746.html
[6]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Stability-guarantees-for-PublicEvolving-classes-tp41459.html
[7]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/FLINK-17073-checkpoint-backpressure-design-doc-tp42788.html

Notable Bugs
==

* [FLINK-18452] [1.11] [1.10.1] SQL queries that use "Top-N" can not be
restored from a savepoint due to a incorrectly implemented Object#equals in
one of the state objects. [8]

[8] https://issues.apache.org/jira/browse/FLINK-18452

Events, Blog Posts, Misc
===

* On the Ververica blog Jaehyeuk Oh & Gihoon Yeom explain how HyperConnect
is using Apache Flink for match making in their real-time communication app
Azar. [9]

* On the Flink blog, Jeff Zhang has published the second part of his blog
post series on Flink on Zeppelin. [10]

[9]
https://www.ververica.com/blog/data-driven-matchmaking-at-azar-with-apache-flink
[10]
https://flink.apache.org/ecosystem/2020/06/23/flink-on-zeppelin-part2.html

Cheers,

Konstantin

-- 

Konstantin Knauf

https://twitter.com/snntrable

https://github.com/knaufk


Stateful Functions: Deploying to existing Cluster

2020-07-06 Thread Jan Brusch

Hi,

quick question about Deploying a Flink Stateful Functions Application to 
an existing cluster: The Documentation says to integrate 
"statefun-flink-distribution" as additional maven Dependency in the fat 
jar. 
(https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.1/deployment-and-operations/packaging.html#flink-jar)


But how and where do I upload my module.yml for external function 
definitions in that scenario...?



Best regards

Jan



How to ensure that job is restored from savepoint when using Flink SQL

2020-07-06 Thread shadowell


Hello, everyone,
I have some unclear points when using Flink SQL. I hope to get an 
answer or tell me where I can find the answer.
When using the DataStream API, in order to ensure that the job can 
recover the state from savepoint after adjustment, it is necessary to specify 
the uid for the operator. However, when using Flink SQL, the uid of the 
operator is automatically generated. If the SQL logic changes (operator order 
changes), when the task is restored from savepoint, will it cause some of the 
operator states to be unable to be mapped back, resulting in state loss?


Thanks~
Jie Feng 
| |
shadowell
|
|
shadow...@126.com
|
签名由网易邮箱大师定制

Re: HELP ! ! ! When using Flink1.10 to define table with the string type, the query result is null

2020-07-06 Thread Benchao Li
Hi Jim,

This is a known issue[1], could you verify that if this issue meets your
requirements?

[1] https://issues.apache.org/jira/browse/FLINK-18002

Jim Chen  于2020年7月6日周一 下午1:28写道:

> Hi, everyone!
>
> When i use flink1.10 to define table, and i want to define the json array
> as the string type. But the query resutl is null when i execute the program.
> The detail code as follow:
>
> package com.flink;
>
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.java.StreamTableEnvironment;
> import org.apache.flink.types.Row;
>
> /**
>  * kafka topic: test_action
>  *
>  * kafka message:
>  *   {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID":
> "id002", "actionName": "bbb"} ] }
>  */
> public class Problem2 {
>
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
> .useBlinkPlanner()
> .inStreamingMode()
> .build();
> StreamTableEnvironment bsEnv = StreamTableEnvironment.create(env,
> envSettings);
> //bsEnv.registerFunction("explode3", new ExplodeFunction());
>
> String ddlSource = "CREATE TABLE actionTable3 (\n" +
> "action STRING\n" +
> ") WITH (\n" +
> "'connector.type' = 'kafka',\n" +
> "'connector.version' = '0.11',\n" +
> "'connector.topic' = 'test_action',\n" +
> "'connector.startup-mode' = 'earliest-offset',\n" +
> "'connector.properties.zookeeper.connect' =
> 'localhost:2181',\n" +
> "'connector.properties.bootstrap.servers' =
> 'localhost:9092',\n" +
> "'update-mode' = 'append',\n" +
> "'format.type' = 'json',\n" +
> //"'format.derive-schema' = 'true',\n" +
> "'format.json-schema' = '{\"type\": \"object\",
> \"properties\": {\"action\": {\"type\": \"string\"} } }'" +
> ")";
> System.out.println(ddlSource);
> bsEnv.sqlUpdate(ddlSource);
>
> Table table = bsEnv.sqlQuery("select * from actionTable3");
> //Table table = bsEnv.sqlQuery("select * from actionTable2,
> LATERAL TABLE(explode3(`action`)) as T(`word`)");
> table.printSchema();
> bsEnv.toAppendStream(table, Row.class)
> .print();// the result is null
>
> bsEnv.execute("ARRAY tableFunction Problem");
> }
> }
>


-- 

Best,
Benchao Li


Re: Does savepoint reset the base for incremental checkpoint

2020-07-06 Thread Congxian Qiu
Hi

checkpoint base is only used in the incremental checkpoint, the answer for
the first question is checkpoint x.

After restoring from a savepoint, there is no base for the first checkpoint.

you can ref to the code[1][2] for more information.

[1]
https://github.com/apache/flink/blob/c14f9d2f9f6d6f2da3dc41fcef010e12405e25eb/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java#L416
[2]
https://github.com/apache/flink/blob/master/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java

Best,
Congxian


Steven Wu  于2020年7月6日周一 上午11:46写道:

> In a slightly different variation of sequence (checkpoint x, savepoint y,
> redeploy/restart job from savepoint y, checkpoint x+1), checkpoint x+1
> builds the incremental diff on savepoint y, right?
>
> On Sun, Jul 5, 2020 at 8:08 PM Steven Wu  wrote:
>
>>
>> In this sequence of (checkpoint x, savepoint y, checkpoint x+1), does
>> checkpoint x+1 build the incremental diff based on checkpoint x or
>> savepoint y?
>>
>> Thanks,
>> Steven
>>
>