Re: flink解析kafka json数据

2020-07-25 Thread 吴 祥平
改用csv用一个很不常用的分隔符去获取是可以的,比如/u0005

Get Outlook for Android



Re: Is there a way to use stream API with this program?

2020-07-25 Thread David Anderson
In this use case, couldn't the custom trigger register an event time timer
for MAX_WATERMARK, which would be triggered when the bounded input reaches
its end?

David

On Mon, Jul 20, 2020 at 5:47 PM Piotr Nowojski  wrote:

> Hi,
>
> I'm afraid that there is not out of the box way of doing this. I've
> created a ticket [1] to write down and document a discussion that we had
> about this issue in the past.
>
> The issue is that currently, untriggered processing time timers are
> ignored on end of input and it seems like there might be no one single
> perfect way to handle it for all of the cases, but it probably needs to be
> customized.
>
> Maybe you could:
> 1. extend `WindowOperator`  (`MyWindowOperator`)
> 2. implement
> `org.apache.flink.streaming.api.operators.BoundedOneInput#endInput` in your
> `MyWindowOperator`
> 3. Inside `MyWindowOperator#endInput`  invoke
> `internalTimerService.forEachProcessingTimeTimer(...)` and:
>   a) manually trigger timers `WindowOperator#onProcessingTime`
>   b) delete manually triggered timer
>
> Piotrek
>
> [1] https://issues.apache.org/jira/browse/FLINK-18647
>
> pt., 17 lip 2020 o 10:30 Flavio Pompermaier 
> napisał(a):
>
>> Hi to all,
>> I was trying to port another job we have that use dataset API to
>> datastream.
>> The legacy program was doing basically a dataset.mapPartition().reduce()
>> so I tried to replicate this thing with a
>>
>>  final BasicTypeInfo columnType = BasicTypeInfo.DOUBLE_TYPE_INFO;
>>   final DataStream input = env.fromElements(//
>> Row.of(1.0), //
>> Row.of(2.0), //
>> Row.of(3.0), //
>> Row.of(5.0), //
>> Row.of(6.0)).returns(new RowTypeInfo(columnType));
>>  inputStream.map(new SubtaskIndexAssigner(columnType))
>> .keyBy(t -> t.f0)
>> .window(GlobalWindows.create())
>>
>> .trigger(PurgingTrigger.of(CountWithTimeoutTriggerPartition.of(Time.seconds(5),
>> 100L))).
>> .process(..)
>>
>> Unfortunately the program exits before reaching the Process function
>> (moreover I need to add another window + trigger after it before adding the
>> reduce function).
>> Is there a way to do this with the DataStream API or should I still use
>> DataSet API for the moment (when the batch will be fully supported)? I
>> append to the footer all the code required to test the job.
>>
>> Best,
>> Flavio
>>
>> -
>>
>> package org.apache.flink.stats.sketches;
>>
>> import org.apache.flink.api.common.functions.ReduceFunction;
>> import org.apache.flink.api.common.functions.RichMapFunction;
>> import org.apache.flink.api.common.state.ReducingState;
>> import org.apache.flink.api.common.state.ReducingStateDescriptor;
>> import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
>> import org.apache.flink.api.common.typeinfo.TypeInformation;
>> import org.apache.flink.api.common.typeutils.base.LongSerializer;
>> import org.apache.flink.api.java.io.PrintingOutputFormat;
>> import org.apache.flink.api.java.tuple.Tuple2;
>> import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
>> import org.apache.flink.api.java.typeutils.RowTypeInfo;
>> import org.apache.flink.api.java.typeutils.TupleTypeInfo;
>> import org.apache.flink.configuration.Configuration;
>> import org.apache.flink.streaming.api.TimeCharacteristic;
>> import org.apache.flink.streaming.api.datastream.DataStream;
>> import
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>> import
>> org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
>> import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
>> import org.apache.flink.streaming.api.windowing.time.Time;
>> import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
>> import org.apache.flink.streaming.api.windowing.triggers.Trigger;
>> import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
>> import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
>> import org.apache.flink.types.Row;
>> import org.apache.flink.util.Collector;
>>
>> public class Test {
>>   public static void main(String[] args) throws Exception {
>> StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
>> env.setParallelism(1);
>>
>> final BasicTypeInfo columnType =
>> BasicTypeInfo.DOUBLE_TYPE_INFO;
>> final DataStream input = env.fromElements(//
>> Row.of(1.0), //
>> Row.of(2.0), //
>> Row.of(3.0), //
>> Row.of(5.0), //
>> Row.of(6.0)).returns(new RowTypeInfo(columnType));
>> final DataStream out = input.map(new
>> SubtaskIndexAssigner(columnType))//
>> .keyBy(t -> t.f0)//
>> .window(GlobalWindows.create())
>> .trigger(PurgingTrigger.of(new
>> CountWithTimeoutTriggerPartition(Time.seconds(5), 100L)))
>> .process(new ProcessWindowFunction, Row,
>> 

Re: Count of records coming into the Flink App from KDS and at each step through Execution Graph

2020-07-25 Thread David Anderson
Setting up a Flink metrics dashboard in Grafana requires setting up and
configuring one of Flink's metrics reporters [1] that is supported by
Grafana as a data source. That means your options for a metrics reporter
are Graphite, InfluxDB, Prometheus, or the Prometheus push reporter.

If you want reporting every 5 seconds, with the push based reporters that's
something you would configure in flink-conf.yaml, whereas with Prometheus
you'll need to configure the scrape interval in the prometheus config file.
For more on using Flink with Prometheus, see the blog post by Maximilian
Bode [2].

Best,
David

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#reporter
[2] https://flink.apache.org/features/2019/03/11/prometheus-monitoring.html

On Fri, Jul 24, 2020 at 12:57 AM Vijay Balakrishnan 
wrote:

> Hi,
> I am trying to figure out how many records came into the Flink App from
> KDS and how many records got moved to the next step or was dropped by the
> watermarks.
>
> I see on the Ui Table for *Source. Records Sent* with a total and the
> next step *Filter->FlatMap operator with a Records Received *total. How
> can I get these metric values for me to display In Grafana for eg. as I
> want to know a count for each 5 secs, how many records came in and how many
> were filtered out by the watermark or my Custom Filter operator etc  ?
>
> I looked at the breakdown of the Source__Custom_Source in Metrics as show
> in the attached pic. It has values like 0.NumRecordsIn and 0.NumRecordsOut
> and so on from 0 to 9 for the parallelism 10 I specified. It also has
> various breakdowns like 0.Timestamps/Watermarks.numRecordsIn and
> 0.Timestamps/Watermarks.numRecordsOut
>
> Attached are some screenshots of the Flink DashBoard UI.
>
> TIA,
>
>


Re: Flink 1.11 job stop with save point timeout error

2020-07-25 Thread Congxian Qiu
Hi Ivan
   From the JM log, the savepoint complete with 1 second, and the timeout
exception said that the stop-with-savepoint can not be completed in
60s(this was calculated by 20 -- RestOptions#RETRAY_MAX_ATTEMPTS * 3s --
RestOptions#RETRY_DELAY. you can check the logic here[1]). I'm not sure
what the root cause is currently, could you please share the complete job
JM log. thanks.

[1]
https://github.com/apache/flink/blob/abd58adb7aad54d242b67219498c211e9e18168b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java#L382
Best,
Congxian


Ivan Yang  于2020年7月25日周六 上午3:48写道:

> Hi Robert,
> Below is the job manager log after issuing the “flink stop” command
>
> 
> 2020-07-24 19:24:12,388 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
> Triggering checkpoint 1 (type=CHECKPOINT) @ 1595618652138 for job
> 853c59916ac33dfbf46503b33289929e.
> 2020-07-24 19:24:13,914 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Completed
> checkpoint 1 for job 853c59916ac33dfbf46503b33289929e (7146 bytes in 1774
> ms).
> 2020-07-24 19:27:59,299 INFO  org.apache.flink.runtime.jobmaster.JobMaster
> [] - Triggering stop-with-savepoint for job
> 853c59916ac33dfbf46503b33289929e.
> 2020-07-24 19:27:59,655 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
> Triggering checkpoint 2 (type=SYNC_SAVEPOINT) @ 1595618879302 for job
> 853c59916ac33dfbf46503b33289929e.
> 2020-07-24 19:28:00,962 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Completed
> checkpoint 2 for job 853c59916ac33dfbf46503b33289929e (7147 bytes in 1240
> ms).
> ==
>
> It looks normal to me.
>
> In the kubernetes deployment cluster, we set up a metric reporter, it has
> these keys in the flink-config.yaml
>
> # Metrics Reporter Configuration
> metrics.reporters: wavefront
> metrics.reporter.wavefront.interval: 60 SECONDS
> metrics.reporter.wavefront.env: prod
> metrics.reporter.wavefront.class:
> com.x.flink.monitor.WavefrontReporter
> metrics.reporter.wavefront.host: xx
> metrics.reporter.wavefront.token: xx
> metrics.scope.tm: flink.taskmanager
>
> Could this reporter interval interfere the job manager? I test the same
> job in a standalone
> Flink 1.11.0 without the reporter, Flink stop worked, and no hanging nor
> timeout. Also the same reporter is used in 1.9.1 version where we didn’t
> have issue on “flink stop”.
>
> Thanks
> Ivan
>
>
> On Jul 24, 2020, at 5:15 AM, Robert Metzger  wrote:
>
> Hi Ivan,
> thanks a lot for your message. Can you post the JobManager log here as
> well? It might contain additional information on the reason for the timeout.
>
> On Fri, Jul 24, 2020 at 4:03 AM Ivan Yang  wrote:
>
>> Hello everyone,
>>
>> We recently upgrade FLINK from 1.9.1 to 1.11.0. Found one strange
>> behavior when we stop a job to a save point got following time out error.
>> I checked Flink web console, the save point is created in s3 in 1
>> second.The job is fairly simple, so 1 second for savepoint generation is
>> expected. We use kubernetes deployment. I clocked it, it’s about 60 seconds
>> when it returns this error. So afterwards, the job is hanging (it still
>> says running, but actually not doing anything). I need run another command
>> to cancel it. Anyone has idea what’s going on here? BTW, “flink stop works”
>> in 1.19.1 for us before
>>
>>
>>
>> flink@flink-jobmanager-fcf5d84c5-sz4wk:~$ flink stop
>> 88d9b46f59d131428e2a18c9c7b3aa3f
>> Suspending job "88d9b46f59d131428e2a18c9c7b3aa3f" with a savepoint.
>>
>> 
>>  The program finished with the following exception:
>>
>> org.apache.flink.util.FlinkException: Could not stop with a savepoint job
>> "88d9b46f59d131428e2a18c9c7b3aa3f".
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:495)
>> at
>> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:864)
>> at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:487)
>> at
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:931)
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at javax.security.auth.Subject.doAs(Subject.java:422)
>> at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682)
>> at
>> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
>> Caused by: java.util.concurrent.TimeoutException
>> at
>> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784)
>> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:493)
>> ... 

Re: metrics influxdb reporter 不支持https及jar放置路径问题

2020-07-25 Thread Congxian Qiu
Hi
   感谢你的反馈,你可以创建两个 issue 来分别跟进反馈的这两个问题。如果你有兴趣的话,也可以尝试对这两个问题进行共享的

Best,
Congxian


zz zhang  于2020年7月23日周四 下午5:02写道:

> hello,目前Flink1.11.1
>
> 发布的org.apache.flink.metrics.influxdb.InfluxdbReporter默认是上报是http协议,并不支持https协议,源码参考[2]
>
> 另外,文档[1]标注的需要将
> /opt/flink-metrics-influxdb-1.11.0.jar复制到目录plugins/influxdb,经过测试应该是要复制到目录plugins/metrics-influx
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/monitoring/metrics.html#influxdb-orgapacheflinkmetricsinfluxdbinfluxdbreporter
> [2]
> https://github.com/apache/flink/blob/release-1.11/flink-metrics/flink-metrics-influxdb/src/main/java/org/apache/flink/metrics/influxdb/InfluxdbReporter.java#L84
> --
> Best,
> zz zhang
>


Re: 自定义metrics reporter 如何不通过flink conf来注册并生效

2020-07-25 Thread Congxian Qiu
Hi  Fisher
   尝试理解一下你的需求,你自己实现了一个 report,也希望在 source 和 sink 中计算一些 metric,希望把 source 和
sink 的这些 metric 通过自定义的 report 上报到你指定的地方。然后不希望在 env 里面配置 report 的信息,是这样吗?
能否解释下为什么不希望在 flink-conf 中进行配置,而是希望在 env 中进行配置吗

Best,
Congxian


Fisher Xiang  于2020年7月23日周四 下午10:16写道:

> Hi all,
>
> 请问实现了 MetricReporter, CharacterFilter,Scheduled, Reporter 的自定义
> customerReporter 如何*能在 代码env里面注册并实现metric上报*,要求不在flink conf.xml 文件里面配置
> 该customerReporter的信息?
>
> 需求:在自定义的source 和sink等算子里面计算处理成功,失败的数据条数并通过自定义reporter上报,并且该reporter需要是通用型的即
> *适用于多个flink
> 任务*从而避开重复造轮子。
>
> thx
>
> BR
> Fisher
>


????: flink1.11??????????????????mysql????????

2020-07-25 Thread chengyanan1...@foxmail.com
Hello??

flink1.11.0??kafka??

mysqlsink.buffer-flush.max-rows1000
??mysql??1000
mysql



chengyanan1...@foxmail.com
 
 ??
?? 2020-07-24 18:18
 user-zh
?? flink1.11??mysql
flink1.11kafkamysqlkafka300??/??mysql??6??
 
 
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic, 
CheckpointingMode
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
source_Kafka = """
CREATE TABLE kafka_source (
id VARCHAR,
alarm_id VARCHAR,
trck_id VARCHAR
) WITH (
'connector' = 'kafka',
'topic' = 'test',
'properties.bootstrap.servers' = '*',
'properties.group.id' = 'flink_grouper',
'scan.startup.mode' = 'earliest-offset', 
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
)
"""
source_W_detail_ddl = """
CREATE TABLE source_W_detail (
id VARCHAR, 
alarm_id VARCHAR,  
trck_id VARCHAR 
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://198.2.2.71:3306/bda?useSSL=false',
'driver' = 'com.mysql.cj.jdbc.Driver',
'table-name' = 'detail',
'username' = 'root',
'password' = 'root',
'sink.buffer-flush.max-rows' = '1000',
'sink.buffer-flush.interval' = '2s'
"""
env = StreamExecutionEnvironment.get_execution_environment()
env.set_stream_time_characteristic(TimeCharacteristic.ProcessingTime)
env.set_parallelism(1)
env_settings = 
EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
t_env = StreamTableEnvironment.create(env, environment_settings=env_settings)
t_env.execute_sql(source_Kafka)
t_env.execute_sql(source_W_detail_ddl)
table_result1=t_env.execute_sql('''insert into source_W_detail select 
id,alarm_id,trck_id from kafka_source''')
table_result1.get_job_client().get_job_execution_result().result()