flink侧输出算子堵住的问题

2020-11-08 Thread ZT.Ren
问题描述:(兄弟部门的问题,不方便截图,请求原谅~~)
  1. flink版本:  1.6.0
  2. 基本流程:flink读取kafka数据 -> json解析->(process并行度6)往下游11条pipeline发送数据
  3. 问题现象: 运行一段时间后,该任务堵住,sink端无数据产生
  4. 监控信息: 任务在map->sideprocess算子处出现反压,下游window->sink未出现反压。 
map->sideprocess算子task metrics的outputBufferPool偶尔变成1,绝大时间处于0
 目前感觉,process(并行度6) ->侧路输出到下游(11条分支), 这种场景下侧路输出是否支持?




通过Yarn提交Application模式的job TaskManager无法输出日志

2020-11-08 Thread xiaozhennan1...@gmail.com
1.  Flink版本是1.11.1, 使用的是Application模式提交
2.  我是通过这种方式进行提交的   
yarnClusterDescriptor.deployApplicationCluster(ClusterSpecification, 
ApplicationConfiguration)  
3. 我设置了这个参数为 env.java.opts.taskmanager  
-Dlog4j.configurationFile="log4j.properties"
4. log4j.properties这个文件我是直接复制Flink/conf下的log4j.properties
 这种通过api方式进行任务提交,有人遇到过吗

这是我的启动配置参数
以下是报错信息
2020-11-09 13:13:14,365 ERROR 
org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerStdoutFileHandler 
[] - Failed to transfer file from TaskExecutor 
container_e18_1587978117869_214352_01_02.
java.util.concurrent.CompletionException: org.apache.flink.util.FlinkException: 
The file STDOUT is not available on the TaskExecutor.
at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
 ~[?:1.8.0_112]
at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
 ~[?:1.8.0_112]
at 
java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:647) 
~[?:1.8.0_112]
at 
java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:632)
 ~[?:1.8.0_112]
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) 
~[?:1.8.0_112]
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
 ~[?:1.8.0_112]
at 
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:227)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
 [?:1.8.0_112]
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
 [?:1.8.0_112]
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) 
[?:1.8.0_112]
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
 [?:1.8.0_112]
at 
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:890)
 [flink-dist_2.11-1.11.1.jar:1.11.1]
at akka.dispatch.OnComplete.internal(Future.scala:263) 
[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.dispatch.OnComplete.internal(Future.scala:261) 
[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) 
[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188) 
[akka-actor_2.11-2.5.21.jar:2.5.21]
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) 
[scala-library-2.11.12.jar:?]
at 
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
 [flink-dist_2.11-1.11.1.jar:1.11.1]
at 
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44) 
[scala-library-2.11.12.jar:?]
at 
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252) 
[scala-library-2.11.12.jar:?]
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572) 
[akka-actor_2.11-2.5.21.jar:2.5.21]
at 
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23)
 [akka-actor_2.11-2.5.21.jar:2.5.21]
at 
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
 [akka-actor_2.11-2.5.21.jar:2.5.21]
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436) 
[scala-library-2.11.12.jar:?]
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435) 
[scala-library-2.11.12.jar:?]
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) 
[scala-library-2.11.12.jar:?]
at 
akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
 [akka-actor_2.11-2.5.21.jar:2.5.21]
at 
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
 [akka-actor_2.11-2.5.21.jar:2.5.21]
at 
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
 [akka-actor_2.11-2.5.21.jar:2.5.21]
at 
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
 [akka-actor_2.11-2.5.21.jar:2.5.21]
at 
scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) 
[scala-library-2.11.12.jar:?]
at 
akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90) 
[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) 
[akka-actor_2.11-2.5.21.jar:2.5.21]
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
 [akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
[akka-actor_2.11-2.5.21.jar:2.5.21]
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 

Re: flink 1.11.2 如何获取blink计划下的BatchTableEnvironment对象

2020-11-08 Thread Danny Chan
>
> BatchTableEnvironment 环境


是说  BatchTableEnvironment 对象吗

Asahi Lee <978466...@qq.com> 于2020年11月9日周一 上午10:48写道:

> 你好!
>我使用的是flink 1.11.2版本,官网的文档中说明blink的batch执行环境以如下方式获取:
> // ** // BLINK BATCH QUERY // ** import
> org.apache.flink.table.api.EnvironmentSettings; import
> org.apache.flink.table.api.TableEnvironment; EnvironmentSettings bbSettings
> =
> EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
> TableEnvironment bbTableEnv =
> TableEnvironment.create(bbSettings);除过上述的方法之外,是否还有其他方式获取到blink的batch执行环境?而我需要的是BatchTableEnvironment环境,该如何获取?


Re: flink sql kafka connector with avro confluent schema registry support

2020-11-08 Thread Danny Chan
支持的,参考 code https://github.com/apache/flink/pull/12919/commits

陈帅  于2020年11月3日周二 上午8:44写道:

> flink sql 1.11.2 支持 confluent schema registry 下 avro格式的kafka connector吗?
> 官网没找到相关资料。有的话请告知或者提供一下示例,谢谢!
>


Re: Is possible that make two operators always locate in same taskmanager?

2020-11-08 Thread Si-li Liu
Thanks for your reply.

It's a streaming job. The join operator is doing join work, such as join.
The join state is too large so I don't want to keep the state using the
mechanism that Flink provided, and also I don't need very precise join. So
I prefer to let the join operator to calculate a backward timestamp as
state, if the cluster restarts, the consumer can use setStartFromTimestamp
to start from that timestamp.

Now my problem is, consumer can't read the state that join operator
written, so I need a way to need small message (64bit long) from downstream
to upstream. Redis may be a solution, but add external  dependency is a
secondary option if I can pass this message through memory.


Chesnay Schepler  于2020年11月6日周五 上午7:06写道:

> It would be good if you could elaborate a bit more on your use-case.
> Are you using batch or streaming? What kind of "message" are we talking
> about? Why are you thinking of using a static variable, instead of just
> treating this message as part of the data(set/stream)?
>
> On 11/5/2020 12:55 PM, Si-li Liu wrote:
>
> Currently I use Flink 1.9.1. The actual thing I want to do is send some
> messages from downstream operators to upstream operators, which I consider
> use static variable.
>
> But it makes me have to make sure in one taskmanager process it always has
> these two operators, can I use CoLocationGroup to solve this problem? Or
> can anyone give me an example to demostrate the usage of CoLocationGroup ?
>
> Thanks!
> --
> Best regards
>
> Sili Liu
>
>
>

-- 
Best regards

Sili Liu


Re: Re:关于flink任务挂掉报警的监控指标选择

2020-11-08 Thread bradyMk
可是当任务被kill掉,就不会重启,所以只监控重启指标的话,是不是就会忽略掉任务被kill掉这种情况的报警?



-
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Re:Re: Re:Re: Flink StreamingFileSink滚动策略

2020-11-08 Thread bradyMk
了解~万分感谢



-
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink 1.11.2 ????????blink????????BatchTableEnvironment????

2020-11-08 Thread Asahi Lee
??
   ??flink 
1.11.2??blink??batch
// ** // BLINK BATCH QUERY // ** import 
org.apache.flink.table.api.EnvironmentSettings; import 
org.apache.flink.table.api.TableEnvironment; EnvironmentSettings bbSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); 
TableEnvironment bbTableEnv = 
TableEnvironment.create(bbSettings);??blink??batch??BatchTableEnvironment??

Re: Upsert UDFs

2020-11-08 Thread Jark Wu
Hi Rex,

There is a similar question asked recently which I think is the same reason
[1] called retraction amplification.
You can try to turn on the mini-batch optimization to reduce the retraction
amplification.

Best,
Jark

[1]:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/A-question-about-flink-sql-retreact-stream-td39216.html
[2]:
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tuning/streaming_aggregation_optimization.html#minibatch-aggregation

On Fri, 6 Nov 2020 at 03:56, Rex Fenley  wrote:

> Also, just to be clear our ES connector looks like this:
>
> CREATE TABLE sink_es_groups (
> id BIGINT,
> //.. a bunch of scalar fields
> array_of_ids ARRAY,
> PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
> 'connector' = 'elasticsearch-7',
> 'hosts' = '${env:ELASTICSEARCH_HOSTS}',
> 'index' = '${env:GROUPS_ES_INDEX}',
> 'format' = 'json',
> 'sink.bulk-flush.max-actions' = '512',
> 'sink.bulk-flush.max-size' = '1mb',
> 'sink.bulk-flush.interval' = '5000',
> 'sink.bulk-flush.backoff.delay' = '1000',
> 'sink.bulk-flush.backoff.max-retries' = '4',
> 'sink.bulk-flush.backoff.strategy' = 'CONSTANT'
> )
>
>
> On Thu, Nov 5, 2020 at 11:52 AM Rex Fenley  wrote:
>
>> Hello,
>>
>> I'm using the Table API to do a bunch of stateful transformations on CDC
>> Debezium rows and then insert final documents into Elasticsearch via the ES
>> connector.
>>
>> I've noticed that Elasticsearch is constantly deleting and then inserting
>> documents as they update. Ideally, there would be no delete operation for a
>> row update, only for a delete. I'm using the Elasticsearch 7 SQL connector,
>> which I'm assuming uses `Elasticsearch7UpsertTableSink` under the hood,
>> which implies upserts are actually what it's capable of.
>>
>> Therefore, I think it's possibly my table plan that's causing row upserts
>> to turn into deletes + inserts. My plan is essentially a series of Joins
>> and GroupBys + UDF Aggregates (aggregating arrays of data). I think,
>> possibly the UDF Aggs following the Joins + GroupBys are causing the
>> upserts to split into delete + inserts somehow. If this is correct, is it
>> possible to make UDFs that preserve Upserts? Or am I totally off-base with
>> my assumptions?
>>
>> Thanks!
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com  |  BLOG 
>>  |  FOLLOW US   |  LIKE US
>> 
>>
>
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com  |  BLOG   |
>  FOLLOW US   |  LIKE US
> 
>


Re: Re: flink tm cpu cores设置

2020-11-08 Thread Yangze Guo
如何确认没有用的呢?能分享一下jm日志么?
另外这个参数实际是否生效也取决于yarn的调度器是否开启了cpu调度

Best,
Yangze Guo

On Thu, Nov 5, 2020 at 1:50 PM zjfpla...@hotmail.com
 wrote:
>
> 这个再flink-conf.yaml中设置过没用
>
>
>
> zjfpla...@hotmail.com
>
> 发件人: JasonLee
> 发送时间: 2020-11-05 13:49
> 收件人: user-zh
> 主题: Re: flink tm cpu cores设置
> hi 设置yarn.containers.vcores这个参数就可以了
>
>
>
> -
> Best Wishes
> JasonLee
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re:Re: Re:请教一下目前flink submit能不能指定额外的依赖jar

2020-11-08 Thread izual
是提交到 yarn 集群么?
不知道 -yt [1] 是否是你想要的添加依赖的效果?


 1: 
https://github.com/apache/flink/blob/master/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java#L183








在 2020-11-06 11:12:33,"silence"  写道:
>感谢回复,还是希望可以从submit上解决这个问题,不能添加依赖限制了很多应用场景,特别是针对平台来说
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


Re: ValidationException using DataTypeHint in Scalar Function

2020-11-08 Thread Steve Whelan
Hi Dawid,

Just wanted to bump this thread in case you had any thoughts.

Thanks,

Steve

On Thu, Oct 29, 2020 at 2:42 PM Steve Whelan  wrote:

> For some background, I am upgrading from Flink v1.9 to v1.11. So what I am
> about to describe is our implementation on v1.9, which worked. I am trying
> to achieve the same functionality on v1.11.
>
> I have a DataStream whose type is an avro generated POJO, which contains a
> field *UrlParameters* that is of type *Map*. I register
> this stream as a view so I can perform SQL queries on it. One of the
> queries contains the UDF I have previously posted. It appears that in the
> conversion to a view, the type of *UrlParameters* is being converted into 
> *RAW('java.util.Map',
> ?)*.
>
>
> *Code on v1.9*
>
> DataStream pings = // a Kafka stream source deserialized into an avro
> generated POJO
> tableEnvironment.registerDataStream("myTable", pings);
> table = tableEnvironment.sqlQuery("SELECT MAP_VALUE(UrlParameters,
> 'some_key') FROM myTable");
> // tablesinks...
>
>
> *The produced type of my deserializer is:*
>
> @Override
> public TypeInformation getProducedType() {
> // Ping.class is an avro generated POJO
> return TypeInformation.of(Ping.class);
> }
>
> *Scalar UDF MAP_VALUE:*
>
> public static String eval(final Map map, final String key)
> {
> return map.get(key);
> }
>
>
> I an using a UDF to access fields in the *UrlParameters* map because if I
> try to access them directly in the SQL (i.e. `*UrlParameters['some_key']*`),
> I get the below exception. This stackoverflow[1] had suggested the UDF as a
> work around.
>
> Caused by: org.apache.flink.table.api.TableException: Type is not
> supported: ANY
> at
> org.apache.flink.table.planner.calcite.FlinkTypeFactory$.toLogicalType(FlinkTypeFactory.scala:551)
> at
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:478)
> at
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:53)
> at org.apache.calcite.rex.RexCall.accept(RexCall.java:288)
> at
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.$anonfun$visitCall$1(ExprCodeGenerator.scala:490)
>
>
> This above implementation worked successfully on v1.9. We use a stream
> source instead of a table source b/c we do other non-SQL type things with
> the stream.
>
>
> *Code on v1.11*
>
> The following is the implementation on v1.11 which does not work. I was
> using the Old Planner on v1.9 but have switched to the Blink Planner on
> v1.11, in case that has any relevance here.
>
>
> DataStream pings = // a Kafka stream source deserialized into an avro
> generated POJO object
> tableEnvironment.createTemporaryView("myTable", pings);
> table = tableEnvironment.sqlQuery("SELECT MAP_VALUE(UrlParameters,
> 'some_key') FROM myTable");
> // tablesinks...
>
>
> The UDF referenced above produced the below error. So I assumed adding
> DataTypeHints was the way to solve it but I was unable to get that to work.
> That is what prompted the initial email to the ML.
>
> Caused by: org.apache.flink.table.api.ValidationException: Invalid input
> arguments. Expected signatures are:
> MAP_VALUE(map => MAP, key => STRING)
> at
> org.apache.flink.table.types.inference.TypeInferenceUtil.createInvalidInputException(TypeInferenceUtil.java:190)
> at
> org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypesOrError(TypeInferenceOperandChecker.java:131)
> at
> org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypes(TypeInferenceOperandChecker.java:89)
> ... 50 more
> Caused by: org.apache.flink.table.api.ValidationException: Invalid
> argument type at position 0. Data type MAP expected but
> RAW('java.util.Map', ?) passed.
> at
> org.apache.flink.table.types.inference.TypeInferenceUtil.adaptArguments(TypeInferenceUtil.java:137)
> at
> org.apache.flink.table.types.inference.TypeInferenceUtil.adaptArguments(TypeInferenceUtil.java:102)
> at
> org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypesOrError(TypeInferenceOperandChecker.java:126)
> ... 51 more
>
>
> I can try creating a concrete reproducible example if this explanation
> isn't enough though its quite a bit with the avro POJO and custom
> deserializer.
>
>
> Thanks,
>
> Steve
>
>
> [1]
> https://stackoverflow.com/questions/45621542/does-flink-sql-support-java-map-types
>
>>


Re: Re: flink tm cpu cores设置

2020-11-08 Thread zjfpla...@hotmail.com
你说的是启动参数吗?



zjfpla...@hotmail.com
 
发件人: JasonLee
发送时间: 2020-11-05 13:59
收件人: user-zh
主题: Re: Re: flink tm cpu cores设置
hi
可以这么设置-yD yarn.containers.vcores=你设置的值
 
 
 
-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re:Re:请教大神们关于flink-sql中数据赋值问题

2020-11-08 Thread site
目前用的就是流处理方式,在map 
function中进行的hbaseQuery,但是这种在数据量超大的情况下处理效率太差了,每次来消息都要把hbase数据全部下载一次。UDF的方式,这两天我在where条件中使用了一下,但是仍不能像处理流那样灵活使用,因为kafka流不能与hbase流进行join,所以只能处理一次流消息,毕竟流在这个场景下是作为条件输入的,感觉用flink-sql在这种场景下比较受限。
在 2020-11-07 02:37:29,"hailongwang" <18868816...@163.com> 写道:
>Hi si_tianqiang,
>
>
>自定义 UDF 可以解决你的问题吗?
>比如 接收 kakfa 的数据字段定义成 hbaseQuery,然后自定义 UDF 去根据 query 查询数据。
>
>
>Best,
>Hailong Wang
>
>
>
>
>在 2020-11-06 10:41:53,"site"  写道:
>>看了官网的示例,发现sql中传入的值都是固定的,我有一个场景是从kafka消息队列接收查询条件,然后通过flink-sql映射hbase表进行查询并写入结果表。我使用了将消息队列映射表再join数据表的方式,回想一下这种方式很不妥,有什么好的方法实现sql入参的动态查询呢?


Help needed to increase throughput of simple flink app

2020-11-08 Thread ashwin konale
Hey guys,
I am struggling to improve the throughput of my simple flink application.
The target topology is this.

read_from_kafka(byte array deserializer) --rescale-->
processFunction(confluent avro deserialization) -> split -> 1.
data_sink,2.dlq_sink

Kafka traffic is pretty high
Partitions: 128
Traffic:  ~500k msg/s, 50Mbps.

Flink is running on k8s writing to hdfs3. I have ~200CPU and 400G memory at
hand. I have tried few configurations but I am not able to get the
throughput more than 1mil per second. (Which I need for recovering from
failures). I have tried increasing parallelism a lot (until 512), But it
has very little impact on the throughput. Primary metric I am considering
for throughput is kafka-source, numRecordsOut and message backlog. I have
already increased default kafka consumer defaults like max.poll.records
etc. Here are the few things I tried already.
Try0: Check raw kafka consumer throughput (kafka_source -> discarding_sink)
tm: 20, slots:4, parallelism 80
throughput: 10Mil/s

Try1: Disable chaining to introduce network related lag.
tm: 20, slots:4, parallelism 80
throughput: 1Mil/s
Also tried with increasing floating-buffers to 100, and buffers-per-channel
to 64. Increasing parallelism seems to have no effect.
Observation: out/in buffers are always at 100% utilization.

After this I have tried various different things with different network
configs, parallelism,jvm sizes etc. But throughput seems to be stuck at
1Mil. Can someone please help me to figure out what key metrics to look for
and how can I improve the situation. Happy to provide any details needed.

Flink version: 1.11.2


Understanding kafka watermark strategy assigner

2020-11-08 Thread Nikola Hrusov
Hi,

I am reading about the watermark creation of the kafka streams using the
article here:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission

In there, it is a given example where the watermark assigner is directly
attached to the consumer like so (solution 1):

FlinkKafkaConsumer myConsumer = new FlinkKafkaConsumer<>("topic",
> new SimpleStringSchema(), properties);
>
> myConsumer.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20)));
> env.addSource(myConsumer)


Then we can use that by adding it as a source and continue with the
application.

My question is, would that have any/much difference against doing it after
the source? Something like this (solution 2):


> FlinkKafkaConsumer myConsumer = new FlinkKafkaConsumer<>("topic",
> new SimpleStringSchema(), properties);
> env

.addSource(myConsumer)

.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20)));
>

I can eventually think that it would create an extra operator, but is there
any other [unnecessary] overhead that solution 2 will give over solution 1?
I tried running a simple job, but I couldn't see much difference. I would
like to know if there is something I am unaware of and I can do better.

Regards
,
Nikola Hrusov


Re: How to use properly the function: withTimestampAssigner((event, timestamp) ->..

2020-11-08 Thread Simone Cavallarin
Hi Till,

That's great! thank you so much!!! I have spent one week on this. I'm so 
relieved!

Cheers

s



From: Till Rohrmann 
Sent: 06 November 2020 17:56
To: Simone Cavallarin 
Cc: user@flink.apache.org ; Aljoscha Krettek 

Subject: Re: How to use properly the function: withTimestampAssigner((event, 
timestamp) ->..

Hi Simone,

The problem is that the Java 1.8 compiler cannot do type inference when 
chaining methods [1].

The solution would be

WatermarkStrategy wmStrategy =
WatermarkStrategy
.forMonotonousTimestamps()
.withTimestampAssigner((event, timestamp) -> { return 
event.getTime();
});

@Aljoscha Krettek I think we need to update the 
documentation about it. We have some examples which don't take this into 
account.

[1] 
https://e.printstacktrace.blog/java-type-inference-generic-methods-chain-call/

Cheers,
Till

On Fri, Nov 6, 2020 at 4:19 PM Simone Cavallarin 
mailto:cavalla...@hotmail.com>> wrote:
Hi,

I'm taking the timestamp from the event payload that I'm receiving from Kafka.

I'm struggling to get the time and I'm confused on how I should use the 
function ".withTimestampAssigner()". I'm receiving an error on event.getTime() 
that is telling me: "cannot resolve method "Get Time" in "Object" and I really 
don't understand how I can fix it.  My class is providing a long so the 
variable itself should be fine. Any help would be really appreciated.

This is my code:

FlinkKafkaConsumer kafkaData =
new FlinkKafkaConsumer("CorID_0", new 
EventDeserializationSchema(), p);
WatermarkStrategy wmStrategy =
WatermarkStrategy
.forMonotonousTimestamps()
.withTimestampAssigner((event, timestamp) -> { return 
event.getTime();
});

DataStream stream = env.addSource(
kafkaData.assignTimestampsAndWatermarks(wmStrategy));


And to give you the idea of the whole project,

This is the EventDeserializationSchema class:

public class EventDeserializationSchema implements DeserializationSchema 
{

private static final long serialVersionUID = 1L;


private static final CsvSchema schema = CsvSchema.builder()
.addColumn("firstName")
.addColumn("lastName")
.addColumn("age", CsvSchema.ColumnType.NUMBER)
.addColumn("time")
.build();

private static final ObjectMapper mapper = new CsvMapper();

@Override
public Event deserialize(byte[] message) throws IOException {
return mapper.readerFor(Event.class).with(schema).readValue(message);
}

@Override
public boolean isEndOfStream(Event nextElement) {
return false;
}

@Override
public TypeInformation getProducedType() {

return TypeInformation.of(Event.class);
}
}

And this is the Event Class:

public class Event implements Serializable {
public String firstName;
public String lastName;
private int age;
public Long time;



public Event() {
}

public String getFirstName() {
return firstName;
}

public void setFirstName(String firstName) {
this.firstName = firstName;
}

public String getLastName() {
return lastName;
}

public void setLastName(String lastName) {
this.lastName = lastName;
}

public int getAge() {
return age;
}

public void setAge(int age) {
this.age = age;
}

public long getTime() {
return time;
}

public void setTime(String kafkaTime) {
long tn = OffsetDateTime.parse(kafkaTime).toInstant().toEpochMilli();
this.time = tn;
}
}







Re:flink内存超用问题

2020-11-08 Thread hailongwang
Hi Bob,
 可以设置下参数 'state.backend.rocksdb.memory.fixed-per-slot' [1] 看下有没有效果。
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#state-backend-rocksdb-memory-fixed-per-slot


Best,
Hailong Wang




在 2020-11-08 10:50:29,"元始(Bob Hu)" <657390...@qq.com> 写道:
>请教下,我有个flink任务经常因为内存超用被yarn 集群kill,不知道该怎么排查问题,flink版本1.11.0,启动命令为:
>bin/flink run -m yarn-cluster -yjm 2048m -ytm 8192m -ys 2 
>xxx.jar,使用rocksdb状态后端,设置的参数有taskmanager.memory.managed.fraction=0.6;taskmanager.memory.jvm-overhead.fraction=0.2。下面是某个时刻flink页面的taskmanage统计。请问内存超用可能是来自什么地方呢,感觉程序中并没用用到第三方jar使用大量native,自己程序里也没有用native内存的地方
>
>
>Free Slots / All Slots:0 / 2
>CPU Cores:24
>Physical Memory:251 GB
>JVM Heap Size:1.82 GB
>Flink Managed Memory:4.05 GB
>
>Memory
>
>
>JVM (Heap/Non-Heap)
>
>
>Type
>Committed
>Used
>Maximum
>
>Heap1.81 GB1.13 GB1.81 GB
>Non-Heap169 MB160 MB1.48 GB
>Total1.98 GB1.29 GB3.30 GB
>
>
>
>
>
>Outside JVM
>
>
>Type
>Count
>Used
>Capacity
>
>Direct24,493718 MB718 MB
>Mapped00 B0 B
>
>
>
>
>
>
>Network
>
>
>Memory Segments
>
>
>Type
>Count
>
>Available21,715
>Total22,118
>
>
>
>
>
>Garbage Collection
>
>
>Collector
>Count
>Time
>
>PS_Scavenge19917,433
>PS_MarkSweep44,173


Re: flink savepoint

2020-11-08 Thread 张锴
看到了,通过JM看到是写的权限没有,改了之后就好了

Congxian Qiu  于2020年11月6日周五 下午1:31写道:

> Hi
>  从 client 端日志,或者 JM 日志还能看到其他的异常么?
> Best,
> Congxian
>
>
> 张锴  于2020年11月6日周五 上午11:42写道:
>
> > 重启和反压都正常
> > 另外增加了从客户端到master的时间,还是有这个问题
> >
> > hailongwang <18868816...@163.com> 于 2020年11月6日周五 10:54写道:
> >
> > > Hi,
> > >
> > >
> > > 这个报错只是在规定的时间内没有完成 Savepoint,导致客户端连接 Master 超时,
> > > 具体的原因需要看下 Jobmaster 的日志。
> > > PS:在任务一直重启、反压的情况下,一般 Savepoint 都会失败。
> > >
> > >
> > > Best,
> > > Hailong Wang
> > >
> > >
> > >
> > >
> > > 在 2020-11-06 09:33:48,"张锴"  写道:
> > > >本人在使用flink savepoint 保存快照的时候,遇到错误,目前不清楚是因为什么导致这个问题,路过的大佬帮忙看下。
> > > >
> > > >flink 版本1.10.1
> > > >
> > > >
> > > >执行   flink savepoint a3a2e6c3a5a00bbe4c0c9e351dc58c47
> > > >hdfs://hadoopnamenodeHA/flink/flink-savepoints
> > > >
> > > >
> > > >出现错误信息
> > > >
> > > >
> > > >org.apache.flink.util.FlinkException: Triggering a savepoint for the
> job
> > > >a3a2e6c3a5a00bbe4c0c9e351dc58c47 failed.
> > > >
> > > > at
> > >
> > >
> >
> >org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:631)
> > > >
> > > > at
> > >
> > >
> >
> >org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:609)
> > > >
> > > > at
> > >
> > >
> >
> >org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:841)
> > > >
> > > > at
> > > org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:606)
> > > >
> > > > at
> > >
> > >
> >
> >org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:908)
> > > >
> > > > at
> > >
> > >
> >
> >org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)
> > > >
> > > > at java.security.AccessController.doPrivileged(Native Method)
> > > >
> > > > at javax.security.auth.Subject.doAs(Subject.java:422)
> > > >
> > > > at
> > >
> > >
> >
> >org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
> > > >
> > > > at
> > >
> > >
> >
> >org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> > > >
> > > > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)
> > > >
> > > >Caused by: java.util.concurrent.TimeoutException
> > > >
> > > > at
> > >
> > >
> >
> >java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
> > > >
> > > > at
> > > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
> > > >
> > > > at
> > >
> > >
> >
> >org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:625)
> > >
> >
>


Re: flink savepoint

2020-11-08 Thread 张锴
已经指定了

admin <17626017...@163.com> 于2020年11月6日周五 下午3:17写道:

> Hi,
> 你的任务时跑在yarn上的吗?如果是 需要指定 -yid
>
> > 2020年11月6日 下午1:31,Congxian Qiu  写道:
> >
> > Hi
> > 从 client 端日志,或者 JM 日志还能看到其他的异常么?
> > Best,
> > Congxian
> >
> >
> > 张锴  于2020年11月6日周五 上午11:42写道:
> >
> >> 重启和反压都正常
> >> 另外增加了从客户端到master的时间,还是有这个问题
> >>
> >> hailongwang <18868816...@163.com> 于 2020年11月6日周五 10:54写道:
> >>
> >>> Hi,
> >>>
> >>>
> >>> 这个报错只是在规定的时间内没有完成 Savepoint,导致客户端连接 Master 超时,
> >>> 具体的原因需要看下 Jobmaster 的日志。
> >>> PS:在任务一直重启、反压的情况下,一般 Savepoint 都会失败。
> >>>
> >>>
> >>> Best,
> >>> Hailong Wang
> >>>
> >>>
> >>>
> >>>
> >>> 在 2020-11-06 09:33:48,"张锴"  写道:
>  本人在使用flink savepoint 保存快照的时候,遇到错误,目前不清楚是因为什么导致这个问题,路过的大佬帮忙看下。
> 
>  flink 版本1.10.1
> 
> 
>  执行   flink savepoint a3a2e6c3a5a00bbe4c0c9e351dc58c47
>  hdfs://hadoopnamenodeHA/flink/flink-savepoints
> 
> 
>  出现错误信息
> 
> 
>  org.apache.flink.util.FlinkException: Triggering a savepoint for the
> job
>  a3a2e6c3a5a00bbe4c0c9e351dc58c47 failed.
> 
>  at
> >>>
> >>>
> >>>
> org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:631)
> 
>  at
> >>>
> >>>
> >>>
> org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:609)
> 
>  at
> >>>
> >>>
> >>>
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:841)
> 
>  at
> >>> org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:606)
> 
>  at
> >>>
> >>>
> >>>
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:908)
> 
>  at
> >>>
> >>>
> >>>
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)
> 
>  at java.security.AccessController.doPrivileged(Native Method)
> 
>  at javax.security.auth.Subject.doAs(Subject.java:422)
> 
>  at
> >>>
> >>>
> >>>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
> 
>  at
> >>>
> >>>
> >>>
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> 
>  at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)
> 
>  Caused by: java.util.concurrent.TimeoutException
> 
>  at
> >>>
> >>>
> >>>
> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
> 
>  at
> >>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
> 
>  at
> >>>
> >>>
> >>>
> org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:625)
> >>>
> >>
>
>