回复: 回复:MapState 无法更新问题

2021-03-09 文章 明启 孙
你虽然调整了实现方法的顺序,但是这个程序执行顺序还是先执行processElement(),后执行processBroadcastElement()


发件人: chaos
发送时间: 2021年3月10日 14:29
收件人: user-zh@flink.apache.org
主题: Re: 回复:MapState 无法更新问题

主要代码如下:

class getRule extends KeyedBroadcastProcessFunction[String,
KafkaStreamSource, List[Rule], KafkaStreamSource] {

private var carEfenceState: MapState[String, Boolean] = _

override def open(parameters: Configuration): Unit = {
carEfenceState = getRuntimeContext.getMapState(new
MapStateDescriptor[String, Boolean]("carEfenceState", classOf[String],
classOf[Boolean]))
}

override def processBroadcastElement(in2: List[Rule], context:
KeyedBroadcastProcessFunction[String, KafkaStreamSource, List[Rule],
KafkaStreamSource]#Context, collector: Collector[KafkaStreamSource]): Unit =
{
context.getBroadcastState(ruleStateDescriptor).put("rules", in2)
}

override def processElement(kafkaSource: KafkaStreamSource,
readOnlyContext: KeyedBroadcastProcessFunction[String, KafkaStreamSource,
List[Rule], KafkaStreamSource]#ReadOnlyContext, collector:
Collector[KafkaStreamSource]): Unit = {

val ruleIterator =
readOnlyContext.getBroadcastState(ruleStateDescriptor).immutableEntries().iterator()
while (ruleIterator.hasNext) {
val ruleMap: Map.Entry[String, List[Rule]] = 
ruleIterator.next()
val ruleList: List[Rule] = ruleMap.getValue


for (rule <- ruleList) {

val mapKey = kafkaSource.vno + rule.id
val tempState = carEfenceState.get(mapKey)
val currentState = if (tempState != null) 
tempState else false
// 业务逻辑
if (!currentState) {
...
carEfenceState.put(mapKey, true)
...
} else if (currentState) {
...
carEfenceState.remove(mapKey)
...
}
}
}
}
}



--
Sent from: http://apache-flink.147419.n8.nabble.com/
c



回复: 回复:MapState 无法更新问题

2021-03-09 文章 明启 孙
A read-only view of the {@link BroadcastState}.
*
* Although read-only, the user code should not modify the value returned by 
the {@link
* #get(Object)} or the entries of the immutable iterator returned by the {@link
* #immutableEntries()}, as this can lead to inconsistent states. The reason for 
this is that we do
* not create extra copies of the elements for performance reasons.
*
* @param  The key type of the elements in the {@link ReadOnlyBroadcastState}.
* @param  The value type of the elements in the {@link 
ReadOnlyBroadcastState}.
*/
这是源码中对ReadOnlyBroadcastState的描述,希望对你有帮助


smq

发件人: chaos
发送时间: 2021年3月10日 14:29
收件人: user-zh@flink.apache.org
主题: Re: 回复:MapState 无法更新问题

主要代码如下:

class getRule extends KeyedBroadcastProcessFunction[String,
KafkaStreamSource, List[Rule], KafkaStreamSource] {

private var carEfenceState: MapState[String, Boolean] = _

override def open(parameters: Configuration): Unit = {
carEfenceState = getRuntimeContext.getMapState(new
MapStateDescriptor[String, Boolean]("carEfenceState", classOf[String],
classOf[Boolean]))
}

override def processBroadcastElement(in2: List[Rule], context:
KeyedBroadcastProcessFunction[String, KafkaStreamSource, List[Rule],
KafkaStreamSource]#Context, collector: Collector[KafkaStreamSource]): Unit =
{
context.getBroadcastState(ruleStateDescriptor).put("rules", in2)
}

override def processElement(kafkaSource: KafkaStreamSource,
readOnlyContext: KeyedBroadcastProcessFunction[String, KafkaStreamSource,
List[Rule], KafkaStreamSource]#ReadOnlyContext, collector:
Collector[KafkaStreamSource]): Unit = {

val ruleIterator =
readOnlyContext.getBroadcastState(ruleStateDescriptor).immutableEntries().iterator()
while (ruleIterator.hasNext) {
val ruleMap: Map.Entry[String, List[Rule]] = 
ruleIterator.next()
val ruleList: List[Rule] = ruleMap.getValue


for (rule <- ruleList) {

val mapKey = kafkaSource.vno + rule.id
val tempState = carEfenceState.get(mapKey)
val currentState = if (tempState != null) 
tempState else false
// 业务逻辑
if (!currentState) {
...
carEfenceState.put(mapKey, true)
...
} else if (currentState) {
...
carEfenceState.remove(mapKey)
...
}
}
}
}
}



--
Sent from: http://apache-flink.147419.n8.nabble.com/
c



Re: BUG :DataStream 转 Table 后无法 触发窗口计算

2021-03-09 文章 Leonard Xu
你好,
你的flink版本是多少?
之前有个bug是Table转datastream 会丢rowtime问题,看起来是这个问题。

我在[1]里修复了,你可以升级对应的版本试下。


祝好,
Leonard
[1]https://issues.apache.org/jira/browse/FLINK-21013 
 



> 在 2021年3月10日,14:34,HunterXHunter <1356469...@qq.com> 写道:
> 
> 再试了一下:
> 修改并行度也不行
>.setParallelism(9)
> 
> 
> 
> 
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



Re: BUG :DataStream 转 Table 后无法 触发窗口计算

2021-03-09 文章 HunterXHunter
再试了一下:
修改并行度也不行
.setParallelism(9)




--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Re: 频繁发生 'ResourceManager leader changed to new address null' 异常导致任务重启

2021-03-09 文章 yidan zhao
希望有大佬给下这些参数的区别。如果环境的网络不好,纠结要调整哪个参数?还是哪些参数。 我目前只提高了 ask.timeout 。
目前看配置,太多与timeout相关的参数了。

akka.ask.timeout
akka.lookup.timeout
akka.retry-gate-closed-for
akka.tcp.timeout
akka.startup-timeout

heartbeat.interval
heartbeat.timeout

high-availability.zookeeper.client.connection-timeout
high-availability.zookeeper.client.session-timeout


taskmanager.network.request-backoff.max

...

yidan zhao  于2021年3月10日周三 下午1:13写道:

> 今天对比了下G1和copy-MarkSweepCompact的效果。
> 运行相同时间, 相同任务。 G1的GC时长更长,但是次数更多,因为每次GC的时间更短。
> 1h15min时间,G1的gc 1100+次,平均每次1s左右。 后者gc 205次,平均每次1.9s左右。
>
> yidan zhao  于2021年3月9日周二 下午7:30写道:
>
>> 补充,还有就是GC收集器,是否无脑使用G1就可以呢?我之前一直是G1,只是最近修改了opts不小心换成其他了。本意不是为了换GC收集器的。
>>
>> yidan zhao  于2021年3月9日周二 下午7:26写道:
>>
>>> 观察了下。CPU什么的有尖刺,但是也算基本正常,因为我的任务就是5分钟一波。基本每5分钟都有个尖刺。
>>> 然后目前通过Flink的web-ui看了下gc情况。
>>> 发现部分集群的fgc的确有问题,fgc平均大概达到10-20s,当然只有平均值,不清楚是否有某些gc时间更长情况。总难题来说10-20s的确是比较长的,这个我之后会去看看改进下。
>>>
>>> (1)不过不清楚这个是否和这个问题直接相关,因为20s的卡顿是否足以引起该问题呢?
>>> (2)此外,大家推荐个内存设置,比如你们都多少TM,每个TM多少内存,跑的任务多大数据量大概。
>>>我目前5个TM的集群,单TM100G内存,跑任务大概10w
>>> qps的入口流量,但是很大部分呢会过滤掉,后续部分流量较少。此外,检查点大概达到3-4GB。
>>>
>>>
>>> Michael Ran  于2021年3月9日周二 下午4:27写道:
>>>
 看看当时的负载呢?有没有过高的情况,是什么原因。然后监控下网络和磁盘
 在 2021-03-09 14:57:43,"yidan zhao"  写道:
 >而且大家推荐怎么设置呢,我可能默认就G1了。不清楚G1是否也需要精调。
 >我目前设置的内存还是比较大的。(50G的,100G的TaskManager都有),这么大heap,是否需要特别设置啥呢?
 >
 >或者是否有必要拆小,比如设置10Gheap,然后把taskmanager数量提上去。
 >
 >yidan zhao  于2021年3月9日周二 下午2:56写道:
 >
 >> 好的,我会看下。
 >> 然后我今天发现我好多个集群GC collector不一样。
 >> 目前发现3种,默认的是G1。flink conf中配置了env.java.opts:
 >> "-XX:-OmitStackTraceInFastThrow"的情况出现了2种,一种是Parallel GC with 83
 >> threads,还有一种是Mark Sweep Compact GC。
 >> 大佬们,Flink是根据内存大小有什么动态调整吗。
 >>
 >>
 >>
 不使用G1我大概理解了,可能设置了java.opts这个是覆盖,不是追加。本身我只是希望设置下-XX:-OmitStackTraceInFastThrow而已。
 >>
 >>
 >> 杨杰 <471419...@qq.com> 于2021年3月8日周一 下午3:09写道:
 >>
 >>> Hi,
 >>>
 >>>   可以排查下 GC 情况,频繁 FGC 也会导致这些情况。
 >>>
 >>> Best,
 >>> jjiey
 >>>
 >>> > 2021年3月8日 14:37,yidan zhao  写道:
 >>> >
 >>> >
 >>>
 如题,我有个任务频繁发生该异常然后重启。今天任务启动1h后,看了下WEB-UI的检查点也没,restored达到了8已经。然后Exception页面显示该错误,估计大多数都是因为该错误导致的restore。
 >>> > 除此外,就是 ‘Job leader for job id eb5d2893c4c6f4034995b9c8e180f01e
 lost
 >>> > leadership’ 错导致任务重启。
 >>> >
 >>> > 下面给出刚刚的一个错误日志(环境flink1.12,standalone集群,5JM+5TM,JM和TM混部在相同机器):
 >>> > 2021-03-08 14:31:40
 >>> > org.apache.flink.runtime.io
 >>> .network.netty.exception.RemoteTransportException:
 >>> > Error at remote task manager '10.35.185.38/10.35.185.38:2016'.
 >>> >at org.apache.flink.runtime.io.network.netty.
 >>> > CreditBasedPartitionRequestClientHandler.decodeMsg(
 >>> > CreditBasedPartitionRequestClientHandler.java:294)
 >>> >at org.apache.flink.runtime.io.network.netty.
 >>> > CreditBasedPartitionRequestClientHandler.channelRead(
 >>> > CreditBasedPartitionRequestClientHandler.java:183)
 >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
 >>> > AbstractChannelHandlerContext.invokeChannelRead(
 >>> > AbstractChannelHandlerContext.java:379)
 >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
 >>> > AbstractChannelHandlerContext.invokeChannelRead(
 >>> > AbstractChannelHandlerContext.java:365)
 >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
 >>> >
 >>>
 AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext
 >>> > .java:357)
 >>> >at org.apache.flink.runtime.io.network.netty.
 >>> > NettyMessageClientDecoderDelegate.channelRead(
 >>> > NettyMessageClientDecoderDelegate.java:115)
 >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
 >>> > AbstractChannelHandlerContext.invokeChannelRead(
 >>> > AbstractChannelHandlerContext.java:379)
 >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
 >>> > AbstractChannelHandlerContext.invokeChannelRead(
 >>> > AbstractChannelHandlerContext.java:365)
 >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
 >>> >
 >>>
 AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext
 >>> > .java:357)
 >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
 >>> >
 >>>
 DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:
 >>> > 1410)
 >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
 >>> > AbstractChannelHandlerContext.invokeChannelRead(
 >>> > AbstractChannelHandlerContext.java:379)
 >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
 >>> > AbstractChannelHandlerContext.invokeChannelRead(
 >>> > AbstractChannelHandlerContext.java:365)
 >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
 >>> >
 DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
 >>> >at org.apache.flink.shaded.netty4.io.netty.channel.epoll.
 >>> > Abstrac

Re: 如何自定义web-ui上对taskmanager的标识符

2021-03-09 文章 yidan zhao
感觉可以在subtask和taskmanager的2个tab中新加上显示 id
的功能。如何taskmanager支持自定义自己的id(一个用户自定义的随意不重复的id)。

yidan zhao  于2021年3月10日周三 下午2:27写道:

> 如题,目前有几个地方的需求。
> (1)taskmanagers页面的path、id是啥参数,是否仅展示,可随意自定义。
>
> (2)任务点到task后展开的右侧页面中。taskmanagers子tab中的host、以及subtasks子tab中的host是否可以自定义。每次希望看比如subtask-n这个是哪台机器,默认看到host不足以定位机器(我们是容器,因为无法根据host直接登陆),单个host上可能虚拟出多个容器;如果这个host不作为绑定地址,仅作为展示的话是否有地方可以配置。
>
>
>


Re: BUG :DataStream 转 Table 后无法 触发窗口计算

2021-03-09 文章 HunterXHunter
经过再一次验证:
即使我做group by rowtime的操作,
我对datastream做keyby(rowtime) 也有这个问题
例如:
tableEnv.createTemporaryView("test3", tableEnv.sqlQuery("select msg,rowtime
from test "));
SingleOutputStreamOperator r =
tableEnv.toRetractStream(tableEnv.from("test3"), Row.class)
.filter(x -> x.f0)
.keyby(_.f1)
.returns(Types.TUPLE(Types.STRING, Types.LONG))
.assignTimestampsAndWatermarks(
WatermarkStrategy.>forBoundedOutOfOrderness(Duration.ofSeconds(1))
.withTimestampAssigner(((element,
recordTimestamp) -> element.f1))
);

结果也是无法触发窗口



--
Sent from: http://apache-flink.147419.n8.nabble.com/


flinkx????????????????

2021-03-09 文章 ??????
??
??2021??flinkx??
flnkx??jsonbegin1??end1begin2??reader??end2??writer??

{
"job": {
    "content": [
      {
        "reader": {
          "parameter": {
            "username": "root",
            "password": "123456",
            "connection": [{
              "jdbcUrl": 
["jdbc:mysql://localhost:3306/flinkx1?useUnicode=true&characterEncoding=utf8"],
              "table": ["begin1","begin2"]
            }],
            "column": [
                {
                "name": "id",
                "type": "int"
                },
                {
                "name": "name",
                "type": "varchar"
                }
            ],
            "customSql": "",
            "splitPk": "id",
            "queryTimeOut": 1000,
            "requestAccumulatorInterval": 2,
            "increColumn": "id",
            "startLocation": ""
          },
          "name": "mysqlreader"
        },
"writer": {
          "name": "mysqlwriter",
          "parameter": {
            "username": "root",
            "password": "123456",
            "connection": [
              {
                "jdbcUrl": 
"jdbc:mysql://localhost:3306/flinkx1?useUnicode=true&characterEncoding=utf8",
                "table": ["end1","end2"]
              }
            ],
            "writeMode": "insert",
            "column": [

           {
                "name": "id",
                "type": "int"
                },
                {
                "name": "name",
                "type": "varchar"
                }
            ],
            "batchSize": 1024
          }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": 2,
        "bytes": 1000
      },
      "errorLimit": {
        "record": 0
      },
      "restore": {
        "maxRowNumForCheckpoint": 0,
        "isRestore": false,
        "restoreColumnName": "",
        "restoreColumnIndex": 0
      },
      "log" : {
        "isLogger": false,
        "level" : "debug",
        "path" : " /data/flinkx/flinkxconf/log/",
        "pattern":""
      }
    }
  }
}

flinkx????????

2021-03-09 文章 ??????
??
??2021??flinkx??
flnkx??jsonbegin1??end1begin2??reader??end2??writer??

{
"job": {
    "content": [
      {
        "reader": {
          "parameter": {
            "username": "root",
            "password": "123456",
            "connection": [{
              "jdbcUrl": 
["jdbc:mysql://localhost:3306/flinkx1?useUnicode=true&characterEncoding=utf8"],
              "table": ["begin1","begin2"]
            }],
            "column": [
                {
                "name": "id",
                "type": "int"
                },
                {
                "name": "name",
                "type": "varchar"
                }
            ],
            "customSql": "",
            "splitPk": "id",
            "queryTimeOut": 1000,
            "requestAccumulatorInterval": 2,
            "increColumn": "id",
            "startLocation": ""
          },
          "name": "mysqlreader"
        },
"writer": {
          "name": "mysqlwriter",
          "parameter": {
            "username": "root",
            "password": "123456",
            "connection": [
              {
                "jdbcUrl": 
"jdbc:mysql://localhost:3306/flinkx1?useUnicode=true&characterEncoding=utf8",
                "table": ["end1","end2"]
              }
            ],
            "writeMode": "insert",
            "column": [

           {
                "name": "id",
                "type": "int"
                },
                {
                "name": "name",
                "type": "varchar"
                }
            ],
            "batchSize": 1024
          }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": 2,
        "bytes": 1000
      },
      "errorLimit": {
        "record": 0
      },
      "restore": {
        "maxRowNumForCheckpoint": 0,
        "isRestore": false,
        "restoreColumnName": "",
        "restoreColumnIndex": 0
      },
      "log" : {
        "isLogger": false,
        "level" : "debug",
        "path" : " /data/flinkx/flinkxconf/log/",
        "pattern":""
      }
    }
  }
}

Re: 回复:MapState 无法更新问题

2021-03-09 文章 chaos
你好,

主要代码如下:

class getRule extends KeyedBroadcastProcessFunction[String,
KafkaStreamSource, List[Rule], KafkaStreamSource] {

private var carEfenceState: MapState[String, Boolean] = _

override def open(parameters: Configuration): Unit = {
carEfenceState = getRuntimeContext.getMapState(new
MapStateDescriptor[String, Boolean]("carEfenceState", classOf[String],
classOf[Boolean]))
}

override def processBroadcastElement(in2: List[Rule], context:
KeyedBroadcastProcessFunction[String, KafkaStreamSource, List[Rule],
KafkaStreamSource]#Context, collector: Collector[KafkaStreamSource]): Unit =
{
context.getBroadcastState(ruleStateDescriptor).put("rules", in2)
}

override def processElement(kafkaSource: KafkaStreamSource,
readOnlyContext: KeyedBroadcastProcessFunction[String, KafkaStreamSource,
List[Rule], KafkaStreamSource]#ReadOnlyContext, collector:
Collector[KafkaStreamSource]): Unit = {

val ruleIterator =
readOnlyContext.getBroadcastState(ruleStateDescriptor).immutableEntries().iterator()
while (ruleIterator.hasNext) {
val ruleMap: Map.Entry[String, List[Rule]] = 
ruleIterator.next()
val ruleList: List[Rule] = ruleMap.getValue


for (rule <- ruleList) {

val mapKey = kafkaSource.vno + rule.id
val tempState = carEfenceState.get(mapKey)
val currentState = if (tempState != null) 
tempState else false
// 业务逻辑
if (!currentState) {
...
carEfenceState.put(mapKey, true)
...
} else if (currentState) {
...
carEfenceState.remove(mapKey)
...
}
}
}
}
}



--
Sent from: http://apache-flink.147419.n8.nabble.com/


如何自定义web-ui上对taskmanager的标识符

2021-03-09 文章 yidan zhao
如题,目前有几个地方的需求。
(1)taskmanagers页面的path、id是啥参数,是否仅展示,可随意自定义。
(2)任务点到task后展开的右侧页面中。taskmanagers子tab中的host、以及subtasks子tab中的host是否可以自定义。每次希望看比如subtask-n这个是哪台机器,默认看到host不足以定位机器(我们是容器,因为无法根据host直接登陆),单个host上可能虚拟出多个容器;如果这个host不作为绑定地址,仅作为展示的话是否有地方可以配置。


回复:MapState 无法更新问题

2021-03-09 文章 smq
可以贴个完整的代码吗





-- 原始邮件 --
发件人: chaos http://apache-flink.147419.n8.nabble.com/

Re: Re: 频繁发生 'ResourceManager leader changed to new address null' 异常导致任务重启

2021-03-09 文章 yidan zhao
今天对比了下G1和copy-MarkSweepCompact的效果。
运行相同时间, 相同任务。 G1的GC时长更长,但是次数更多,因为每次GC的时间更短。
1h15min时间,G1的gc 1100+次,平均每次1s左右。 后者gc 205次,平均每次1.9s左右。

yidan zhao  于2021年3月9日周二 下午7:30写道:

> 补充,还有就是GC收集器,是否无脑使用G1就可以呢?我之前一直是G1,只是最近修改了opts不小心换成其他了。本意不是为了换GC收集器的。
>
> yidan zhao  于2021年3月9日周二 下午7:26写道:
>
>> 观察了下。CPU什么的有尖刺,但是也算基本正常,因为我的任务就是5分钟一波。基本每5分钟都有个尖刺。
>> 然后目前通过Flink的web-ui看了下gc情况。
>> 发现部分集群的fgc的确有问题,fgc平均大概达到10-20s,当然只有平均值,不清楚是否有某些gc时间更长情况。总难题来说10-20s的确是比较长的,这个我之后会去看看改进下。
>>
>> (1)不过不清楚这个是否和这个问题直接相关,因为20s的卡顿是否足以引起该问题呢?
>> (2)此外,大家推荐个内存设置,比如你们都多少TM,每个TM多少内存,跑的任务多大数据量大概。
>>我目前5个TM的集群,单TM100G内存,跑任务大概10w
>> qps的入口流量,但是很大部分呢会过滤掉,后续部分流量较少。此外,检查点大概达到3-4GB。
>>
>>
>> Michael Ran  于2021年3月9日周二 下午4:27写道:
>>
>>> 看看当时的负载呢?有没有过高的情况,是什么原因。然后监控下网络和磁盘
>>> 在 2021-03-09 14:57:43,"yidan zhao"  写道:
>>> >而且大家推荐怎么设置呢,我可能默认就G1了。不清楚G1是否也需要精调。
>>> >我目前设置的内存还是比较大的。(50G的,100G的TaskManager都有),这么大heap,是否需要特别设置啥呢?
>>> >
>>> >或者是否有必要拆小,比如设置10Gheap,然后把taskmanager数量提上去。
>>> >
>>> >yidan zhao  于2021年3月9日周二 下午2:56写道:
>>> >
>>> >> 好的,我会看下。
>>> >> 然后我今天发现我好多个集群GC collector不一样。
>>> >> 目前发现3种,默认的是G1。flink conf中配置了env.java.opts:
>>> >> "-XX:-OmitStackTraceInFastThrow"的情况出现了2种,一种是Parallel GC with 83
>>> >> threads,还有一种是Mark Sweep Compact GC。
>>> >> 大佬们,Flink是根据内存大小有什么动态调整吗。
>>> >>
>>> >>
>>> >>
>>> 不使用G1我大概理解了,可能设置了java.opts这个是覆盖,不是追加。本身我只是希望设置下-XX:-OmitStackTraceInFastThrow而已。
>>> >>
>>> >>
>>> >> 杨杰 <471419...@qq.com> 于2021年3月8日周一 下午3:09写道:
>>> >>
>>> >>> Hi,
>>> >>>
>>> >>>   可以排查下 GC 情况,频繁 FGC 也会导致这些情况。
>>> >>>
>>> >>> Best,
>>> >>> jjiey
>>> >>>
>>> >>> > 2021年3月8日 14:37,yidan zhao  写道:
>>> >>> >
>>> >>> >
>>> >>>
>>> 如题,我有个任务频繁发生该异常然后重启。今天任务启动1h后,看了下WEB-UI的检查点也没,restored达到了8已经。然后Exception页面显示该错误,估计大多数都是因为该错误导致的restore。
>>> >>> > 除此外,就是 ‘Job leader for job id eb5d2893c4c6f4034995b9c8e180f01e lost
>>> >>> > leadership’ 错导致任务重启。
>>> >>> >
>>> >>> > 下面给出刚刚的一个错误日志(环境flink1.12,standalone集群,5JM+5TM,JM和TM混部在相同机器):
>>> >>> > 2021-03-08 14:31:40
>>> >>> > org.apache.flink.runtime.io
>>> >>> .network.netty.exception.RemoteTransportException:
>>> >>> > Error at remote task manager '10.35.185.38/10.35.185.38:2016'.
>>> >>> >at org.apache.flink.runtime.io.network.netty.
>>> >>> > CreditBasedPartitionRequestClientHandler.decodeMsg(
>>> >>> > CreditBasedPartitionRequestClientHandler.java:294)
>>> >>> >at org.apache.flink.runtime.io.network.netty.
>>> >>> > CreditBasedPartitionRequestClientHandler.channelRead(
>>> >>> > CreditBasedPartitionRequestClientHandler.java:183)
>>> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>>> >>> > AbstractChannelHandlerContext.invokeChannelRead(
>>> >>> > AbstractChannelHandlerContext.java:379)
>>> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>>> >>> > AbstractChannelHandlerContext.invokeChannelRead(
>>> >>> > AbstractChannelHandlerContext.java:365)
>>> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>>> >>> >
>>> >>>
>>> AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext
>>> >>> > .java:357)
>>> >>> >at org.apache.flink.runtime.io.network.netty.
>>> >>> > NettyMessageClientDecoderDelegate.channelRead(
>>> >>> > NettyMessageClientDecoderDelegate.java:115)
>>> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>>> >>> > AbstractChannelHandlerContext.invokeChannelRead(
>>> >>> > AbstractChannelHandlerContext.java:379)
>>> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>>> >>> > AbstractChannelHandlerContext.invokeChannelRead(
>>> >>> > AbstractChannelHandlerContext.java:365)
>>> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>>> >>> >
>>> >>>
>>> AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext
>>> >>> > .java:357)
>>> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>>> >>> >
>>> >>>
>>> DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:
>>> >>> > 1410)
>>> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>>> >>> > AbstractChannelHandlerContext.invokeChannelRead(
>>> >>> > AbstractChannelHandlerContext.java:379)
>>> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>>> >>> > AbstractChannelHandlerContext.invokeChannelRead(
>>> >>> > AbstractChannelHandlerContext.java:365)
>>> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>>> >>> >
>>> DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
>>> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.epoll.
>>> >>> > AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(
>>> >>> > AbstractEpollStreamChannel.java:792)
>>> >>> >at
>>> >>> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop
>>> >>> > .processReady(EpollEventLoop.java:475)
>>> >>> >at
>>> >>> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop
>>> >>> > .run(EpollEventLoop.java:378)
>>> >>> >at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
>>> >>> > SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
>>> >>> >at o

MapState 无法更新问题

2021-03-09 文章 chaos
你好,我在使用广播流的时候定义了一个MapState,并在逻辑处理中往其中放数据,但是我始终没法成功更新其值,忘解惑。 

定义:
private val carEfenceState: MapState[String, Boolean] = new
MapStateDescriptor[String, Boolean]("carEfenceState", classOf[String],
classOf[Boolean])

存值:
carEfenceState.put(mapKey, true)

取值:
carEfenceState.get(mapKey)

取到的值始终为 false.

Thanks in advance!



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink Job 如何集成到自己的系统,方便管理

2021-03-09 文章 9722
有啊,现在很多公司都做了数据平台,并且对外出售,你可以找一个公司的产品试用,然后照着它的功能,自己开发

> 2021年3月6日 16:00,Jacob <17691150...@163.com> 写道:
> 
> 我们现在提交Flink Job 是通过flink客户端run命令提交job,进行实时任务的计算,每次提交都要登录prd机器,上传jar包,过程比较麻烦。 
> 
> 
> 后期规划把实时计算的任务集成到我们已有的一个系统中,把上面描述的过程封装起来,给用户提供一些按钮、菜单等,理想状态下,在这个系统增加一些模块、菜单之类的东西,就能完成对Job的维护,包括提交Job、查看正在运行的Job、停止Job等等
>   
> 
> 上面所说的这个系统是我们自研的一个数据处理平台,实时计算任务也是其中的一环,因此就想把实时计算的任务的模块也集成到其中去。
> 
> 
> 不知道这有没有可能实现
> 
> 请大佬提供些许思路!感谢
> 
> 
> 
> -
> Thanks!
> Jacob
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



Re: Flink Job 如何集成到自己的系统,方便管理

2021-03-09 文章 silence
个人也维护了个flink平台的开源项目,希望可以帮助到你
https://github.com/hairless/plink



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink Job 如何集成到自己的系统,方便管理

2021-03-09 文章 HunterXHunter
https://github.com/zhp8341/flink-streaming-platform-web
这个你可以参考下



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Pyflink 提交到本地集群报错

2021-03-09 文章 Shuiqiang Chen
Huilin 你好,

你用的是哪个版本的Flink呢?

Huilin_WU <592900...@qq.com> 于2021年3月10日周三 上午9:39写道:

> 我在terminal中用python xx.py文件就可以执行,然而用flink run -m localhost:8081 -py
> xx.py就会报上面的错误说没有pyflink的组件。
> (base) huilin@huilin-Lenovo:~/Documents/Learning/experiment$ flink run -m
> localhost:8081 -py demo_predict.py
> Traceback (most recent call last):
>   File "demo_predict.py", line 51, in 
> from pyflink.common.serialization import SimpleStringEncoder
> ModuleNotFoundError: No module named 'pyflink.common.serialization'
>
> 我已经试了很多方法,创建了虚拟环境在里面安装了对应的包,还是不行。请问有什么解决办法?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Pyflink 提交到本地集群报错

2021-03-09 文章 Huilin_WU
我在terminal中用python xx.py文件就可以执行,然而用flink run -m localhost:8081 -py
xx.py就会报上面的错误说没有pyflink的组件。
(base) huilin@huilin-Lenovo:~/Documents/Learning/experiment$ flink run -m
localhost:8081 -py demo_predict.py
Traceback (most recent call last):
  File "demo_predict.py", line 51, in 
from pyflink.common.serialization import SimpleStringEncoder
ModuleNotFoundError: No module named 'pyflink.common.serialization'

我已经试了很多方法,创建了虚拟环境在里面安装了对应的包,还是不行。请问有什么解决办法?



--
Sent from: http://apache-flink.147419.n8.nabble.com/


退订

2021-03-09 文章 Miro.Zheng
退订

flink 1.12.2??????????????????????????????????????????????????

2021-03-09 文章 Asahi Lee
??        ??flink 
1.12??flinkDataStream 
API,??RuntimeExecutionMode.BATCH??
 

package com.meritdata.cloud.tempo.dw.flink.test.bug;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class JDBCTest {

public static void main(String[] args) {
test();
/**
 * ??
 * EnvironmentSettings bbSettings = EnvironmentSettings.newInstance()
 * .useBlinkPlanner().inBatchMode().build();
 * TableEnvironment bbTableEnv = 
TableEnvironment.create(bbSettings);
 * ++--+
 * |  a |   EXPR$1 |
 * ++--+
 * |  2 |1 |
 * |  3 |2 |
 * |  1 |2 |
 * |  4 |1 |
 * ++--+
 */
//test1();

/**
 * ??API
 * StreamExecutionEnvironment streamEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
 * streamEnv.setRuntimeMode(RuntimeExecutionMode.BATCH);
 * StreamTableEnvironment tableEnv = 
StreamTableEnvironment.create(streamEnv);
 * +++--+
 * | op |  a |   EXPR$1 |
 * +++--+
 * | +I |  2 |1 |
 * | +I |  1 |1 |
 * | +I |  4 |1 |
 * | -U |  1 |1 |
 * | +U |  1 |2 |
 * | +I |  3 |1 |
 * | -U |  3 |1 |
 * | +U |  3 |2 |
 * +++--+
 */
}

public static void test() {
EnvironmentSettings bbSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);

bbTableEnv.executeSql("CREATE TABLE ab (" +
"  a STRING, " +
"  b INT " +
") WITH (" +
"   'connector' = 'jdbc'," +
"   'url' = 
'jdbc:mysql://localhost:3306/a?serverTimezone=UTC'," +
"   'username' = 'root'," +
"   'password' = 'root'," +
"   'table-name' = 'ab'" +
" )");

bbTableEnv.sqlQuery("select a, count(b) from ab group by 
a").execute().print();

}

public static void test1() {
StreamExecutionEnvironment streamEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
streamEnv.setRuntimeMode(RuntimeExecutionMode.BATCH);
StreamTableEnvironment tableEnv = 
StreamTableEnvironment.create(streamEnv);

tableEnv.executeSql("CREATE TABLE ab (" +
"  a STRING, " +
"  b INT " +
") WITH (" +
"   'connector' = 'jdbc'," +
"   'url' = 
'jdbc:mysql://localhost:3306/a?serverTimezone=UTC'," +
"   'username' = 'root'," +
"   'password' = 'root'," +
"   'table-name' = 'ab'" +
" )");

tableEnv.sqlQuery("select a, count(b) from ab group by 
a").execute().print();
}

}

Re: Re: 频繁发生 'ResourceManager leader changed to new address null' 异常导致任务重启

2021-03-09 文章 yidan zhao
补充,还有就是GC收集器,是否无脑使用G1就可以呢?我之前一直是G1,只是最近修改了opts不小心换成其他了。本意不是为了换GC收集器的。

yidan zhao  于2021年3月9日周二 下午7:26写道:

> 观察了下。CPU什么的有尖刺,但是也算基本正常,因为我的任务就是5分钟一波。基本每5分钟都有个尖刺。
> 然后目前通过Flink的web-ui看了下gc情况。
> 发现部分集群的fgc的确有问题,fgc平均大概达到10-20s,当然只有平均值,不清楚是否有某些gc时间更长情况。总难题来说10-20s的确是比较长的,这个我之后会去看看改进下。
>
> (1)不过不清楚这个是否和这个问题直接相关,因为20s的卡顿是否足以引起该问题呢?
> (2)此外,大家推荐个内存设置,比如你们都多少TM,每个TM多少内存,跑的任务多大数据量大概。
>我目前5个TM的集群,单TM100G内存,跑任务大概10w
> qps的入口流量,但是很大部分呢会过滤掉,后续部分流量较少。此外,检查点大概达到3-4GB。
>
>
> Michael Ran  于2021年3月9日周二 下午4:27写道:
>
>> 看看当时的负载呢?有没有过高的情况,是什么原因。然后监控下网络和磁盘
>> 在 2021-03-09 14:57:43,"yidan zhao"  写道:
>> >而且大家推荐怎么设置呢,我可能默认就G1了。不清楚G1是否也需要精调。
>> >我目前设置的内存还是比较大的。(50G的,100G的TaskManager都有),这么大heap,是否需要特别设置啥呢?
>> >
>> >或者是否有必要拆小,比如设置10Gheap,然后把taskmanager数量提上去。
>> >
>> >yidan zhao  于2021年3月9日周二 下午2:56写道:
>> >
>> >> 好的,我会看下。
>> >> 然后我今天发现我好多个集群GC collector不一样。
>> >> 目前发现3种,默认的是G1。flink conf中配置了env.java.opts:
>> >> "-XX:-OmitStackTraceInFastThrow"的情况出现了2种,一种是Parallel GC with 83
>> >> threads,还有一种是Mark Sweep Compact GC。
>> >> 大佬们,Flink是根据内存大小有什么动态调整吗。
>> >>
>> >>
>> >>
>> 不使用G1我大概理解了,可能设置了java.opts这个是覆盖,不是追加。本身我只是希望设置下-XX:-OmitStackTraceInFastThrow而已。
>> >>
>> >>
>> >> 杨杰 <471419...@qq.com> 于2021年3月8日周一 下午3:09写道:
>> >>
>> >>> Hi,
>> >>>
>> >>>   可以排查下 GC 情况,频繁 FGC 也会导致这些情况。
>> >>>
>> >>> Best,
>> >>> jjiey
>> >>>
>> >>> > 2021年3月8日 14:37,yidan zhao  写道:
>> >>> >
>> >>> >
>> >>>
>> 如题,我有个任务频繁发生该异常然后重启。今天任务启动1h后,看了下WEB-UI的检查点也没,restored达到了8已经。然后Exception页面显示该错误,估计大多数都是因为该错误导致的restore。
>> >>> > 除此外,就是 ‘Job leader for job id eb5d2893c4c6f4034995b9c8e180f01e lost
>> >>> > leadership’ 错导致任务重启。
>> >>> >
>> >>> > 下面给出刚刚的一个错误日志(环境flink1.12,standalone集群,5JM+5TM,JM和TM混部在相同机器):
>> >>> > 2021-03-08 14:31:40
>> >>> > org.apache.flink.runtime.io
>> >>> .network.netty.exception.RemoteTransportException:
>> >>> > Error at remote task manager '10.35.185.38/10.35.185.38:2016'.
>> >>> >at org.apache.flink.runtime.io.network.netty.
>> >>> > CreditBasedPartitionRequestClientHandler.decodeMsg(
>> >>> > CreditBasedPartitionRequestClientHandler.java:294)
>> >>> >at org.apache.flink.runtime.io.network.netty.
>> >>> > CreditBasedPartitionRequestClientHandler.channelRead(
>> >>> > CreditBasedPartitionRequestClientHandler.java:183)
>> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>> >>> > AbstractChannelHandlerContext.invokeChannelRead(
>> >>> > AbstractChannelHandlerContext.java:379)
>> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>> >>> > AbstractChannelHandlerContext.invokeChannelRead(
>> >>> > AbstractChannelHandlerContext.java:365)
>> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>> >>> >
>> >>>
>> AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext
>> >>> > .java:357)
>> >>> >at org.apache.flink.runtime.io.network.netty.
>> >>> > NettyMessageClientDecoderDelegate.channelRead(
>> >>> > NettyMessageClientDecoderDelegate.java:115)
>> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>> >>> > AbstractChannelHandlerContext.invokeChannelRead(
>> >>> > AbstractChannelHandlerContext.java:379)
>> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>> >>> > AbstractChannelHandlerContext.invokeChannelRead(
>> >>> > AbstractChannelHandlerContext.java:365)
>> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>> >>> >
>> >>>
>> AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext
>> >>> > .java:357)
>> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>> >>> >
>> >>>
>> DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:
>> >>> > 1410)
>> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>> >>> > AbstractChannelHandlerContext.invokeChannelRead(
>> >>> > AbstractChannelHandlerContext.java:379)
>> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>> >>> > AbstractChannelHandlerContext.invokeChannelRead(
>> >>> > AbstractChannelHandlerContext.java:365)
>> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>> >>> >
>> DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
>> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.epoll.
>> >>> > AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(
>> >>> > AbstractEpollStreamChannel.java:792)
>> >>> >at
>> >>> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop
>> >>> > .processReady(EpollEventLoop.java:475)
>> >>> >at
>> >>> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop
>> >>> > .run(EpollEventLoop.java:378)
>> >>> >at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
>> >>> > SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
>> >>> >at org.apache.flink.shaded.netty4.io.netty.util.internal.
>> >>> > ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>> >>> >at java.lang.Thread.run(Thread.java:748)
>> >>> > Caused by: org.apache.flink.runtime.io.network.partition.
>> >>> > ProducerFailedException: org.apache.flink.util

Re: Re: 频繁发生 'ResourceManager leader changed to new address null' 异常导致任务重启

2021-03-09 文章 yidan zhao
观察了下。CPU什么的有尖刺,但是也算基本正常,因为我的任务就是5分钟一波。基本每5分钟都有个尖刺。
然后目前通过Flink的web-ui看了下gc情况。
发现部分集群的fgc的确有问题,fgc平均大概达到10-20s,当然只有平均值,不清楚是否有某些gc时间更长情况。总难题来说10-20s的确是比较长的,这个我之后会去看看改进下。

(1)不过不清楚这个是否和这个问题直接相关,因为20s的卡顿是否足以引起该问题呢?
(2)此外,大家推荐个内存设置,比如你们都多少TM,每个TM多少内存,跑的任务多大数据量大概。
   我目前5个TM的集群,单TM100G内存,跑任务大概10w
qps的入口流量,但是很大部分呢会过滤掉,后续部分流量较少。此外,检查点大概达到3-4GB。


Michael Ran  于2021年3月9日周二 下午4:27写道:

> 看看当时的负载呢?有没有过高的情况,是什么原因。然后监控下网络和磁盘
> 在 2021-03-09 14:57:43,"yidan zhao"  写道:
> >而且大家推荐怎么设置呢,我可能默认就G1了。不清楚G1是否也需要精调。
> >我目前设置的内存还是比较大的。(50G的,100G的TaskManager都有),这么大heap,是否需要特别设置啥呢?
> >
> >或者是否有必要拆小,比如设置10Gheap,然后把taskmanager数量提上去。
> >
> >yidan zhao  于2021年3月9日周二 下午2:56写道:
> >
> >> 好的,我会看下。
> >> 然后我今天发现我好多个集群GC collector不一样。
> >> 目前发现3种,默认的是G1。flink conf中配置了env.java.opts:
> >> "-XX:-OmitStackTraceInFastThrow"的情况出现了2种,一种是Parallel GC with 83
> >> threads,还有一种是Mark Sweep Compact GC。
> >> 大佬们,Flink是根据内存大小有什么动态调整吗。
> >>
> >>
> >>
> 不使用G1我大概理解了,可能设置了java.opts这个是覆盖,不是追加。本身我只是希望设置下-XX:-OmitStackTraceInFastThrow而已。
> >>
> >>
> >> 杨杰 <471419...@qq.com> 于2021年3月8日周一 下午3:09写道:
> >>
> >>> Hi,
> >>>
> >>>   可以排查下 GC 情况,频繁 FGC 也会导致这些情况。
> >>>
> >>> Best,
> >>> jjiey
> >>>
> >>> > 2021年3月8日 14:37,yidan zhao  写道:
> >>> >
> >>> >
> >>>
> 如题,我有个任务频繁发生该异常然后重启。今天任务启动1h后,看了下WEB-UI的检查点也没,restored达到了8已经。然后Exception页面显示该错误,估计大多数都是因为该错误导致的restore。
> >>> > 除此外,就是 ‘Job leader for job id eb5d2893c4c6f4034995b9c8e180f01e lost
> >>> > leadership’ 错导致任务重启。
> >>> >
> >>> > 下面给出刚刚的一个错误日志(环境flink1.12,standalone集群,5JM+5TM,JM和TM混部在相同机器):
> >>> > 2021-03-08 14:31:40
> >>> > org.apache.flink.runtime.io
> >>> .network.netty.exception.RemoteTransportException:
> >>> > Error at remote task manager '10.35.185.38/10.35.185.38:2016'.
> >>> >at org.apache.flink.runtime.io.network.netty.
> >>> > CreditBasedPartitionRequestClientHandler.decodeMsg(
> >>> > CreditBasedPartitionRequestClientHandler.java:294)
> >>> >at org.apache.flink.runtime.io.network.netty.
> >>> > CreditBasedPartitionRequestClientHandler.channelRead(
> >>> > CreditBasedPartitionRequestClientHandler.java:183)
> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
> >>> > AbstractChannelHandlerContext.invokeChannelRead(
> >>> > AbstractChannelHandlerContext.java:379)
> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
> >>> > AbstractChannelHandlerContext.invokeChannelRead(
> >>> > AbstractChannelHandlerContext.java:365)
> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
> >>> >
> >>>
> AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext
> >>> > .java:357)
> >>> >at org.apache.flink.runtime.io.network.netty.
> >>> > NettyMessageClientDecoderDelegate.channelRead(
> >>> > NettyMessageClientDecoderDelegate.java:115)
> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
> >>> > AbstractChannelHandlerContext.invokeChannelRead(
> >>> > AbstractChannelHandlerContext.java:379)
> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
> >>> > AbstractChannelHandlerContext.invokeChannelRead(
> >>> > AbstractChannelHandlerContext.java:365)
> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
> >>> >
> >>>
> AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext
> >>> > .java:357)
> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
> >>> >
> >>>
> DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:
> >>> > 1410)
> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
> >>> > AbstractChannelHandlerContext.invokeChannelRead(
> >>> > AbstractChannelHandlerContext.java:379)
> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
> >>> > AbstractChannelHandlerContext.invokeChannelRead(
> >>> > AbstractChannelHandlerContext.java:365)
> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
> >>> >
> DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.epoll.
> >>> > AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(
> >>> > AbstractEpollStreamChannel.java:792)
> >>> >at
> >>> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop
> >>> > .processReady(EpollEventLoop.java:475)
> >>> >at
> >>> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop
> >>> > .run(EpollEventLoop.java:378)
> >>> >at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
> >>> > SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
> >>> >at org.apache.flink.shaded.netty4.io.netty.util.internal.
> >>> > ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
> >>> >at java.lang.Thread.run(Thread.java:748)
> >>> > Caused by: org.apache.flink.runtime.io.network.partition.
> >>> > ProducerFailedException: org.apache.flink.util.FlinkException:
> >>> JobManager
> >>> > responsible for eb5d2893c4c6f4034995b9c8e180f01e lost the leadership.
> >>> >at org.apache.flink.runtime.io
> .network.netty.PartitionRequestQueue
> >>> > .writeAndFlushNextMessageIfPos

PyFlink UDTF ???????????????? NullPointerException

2021-03-09 文章 ??????
PyFlink UDTFUDTF


class Mac(TableFunction):
    def eval(self, body_data):
        mac_arr = [
                
{"mac":"44:c7:fc:24:0f:91","rssi":"-92","range":"100.0"},
                
{"mac":"44:c7:fc:24:0f:91","rssi":"-92","range":"100.0"},
                
{"mac":"44:c7:fc:24:0f:91","rssi":"-92","range":"100.0"},
                
{"mac":"44:c7:fc:24:0f:91","rssi":"-92","range":"100.0"},
                
{"mac":"44:c7:fc:24:0f:91","rssi":"-92","range":"100.0"},
                
{"mac":"44:c7:fc:24:0f:91","rssi":"-92","range":"100.0"},
                
{"mac":"44:c7:fc:24:0f:91","rssi":"-92","range":"100.0"},
                
{"mac":"44:c7:fc:24:0f:91","rssi":"-92","range":"100.0"},
                
{"mac":"44:c7:fc:24:0f:91","rssi":"-92","range":"100.0"},
                
{"mac":"44:c7:fc:24:0f:91","rssi":"-92","range":"100.0"},
                
{"mac":"44:c7:fc:24:0f:91","rssi":"-92","range":"100.0"},
                
{"mac":"44:c7:fc:24:0f:91","rssi":"-92","range":"100.0"},
                
{"mac":"44:c7:fc:24:0f:91","rssi":"-92","range":"100.0"},
                
{"mac":"44:c7:fc:24:0f:91","rssi":"-92","range":"100.0"},
                
{"mac":"44:c7:fc:24:0f:91","rssi":"-92","range":"100.0"},
                
{"mac":"44:c7:fc:24:0f:91","rssi":"-92","range":"100.0"},
                
{"mac":"44:c7:fc:24:0f:91","rssi":"-92","range":"100.0"},
                
{"mac":"44:c7:fc:24:0f:91","rssi":"-92","range":"100.0"},
                
{"mac":"44:c7:fc:24:0f:91","rssi":"-92","range":"100.0"},
                
{"mac":"44:c7:fc:24:0f:91","rssi":"-92","range":"100.0"},
                
{"mac":"44:c7:fc:24:0f:91","rssi":"-92","range":"100.0"},
                
{"mac":"44:c7:fc:24:0f:91","rssi":"-92","range":"100.0"},
                
{"mac":"44:c7:fc:24:0f:91","rssi":"-92","range":"100.0"},
                
{"mac":"44:c7:fc:24:0f:91","rssi":"-92","range":"100.0"},
                
{"mac":"44:c7:fc:24:0f:91","rssi":"-92","range":"100.0"},
                
{"mac":"44:c7:fc:24:0f:91","rssi":"-92","range":"100.0"},
                
{"mac":"44:c7:fc:24:0f:91","rssi":"-92","range":"100.0"},
                
{"mac":"44:c7:fc:24:0f:91","rssi":"-92","range":"100.0"},
                
{"mac":"08:10:79:67:16:1b","rssi":"-85","router":"Netcore_67161B","range":"55.0"}
                ]
        return mac_arr


table_env.create_temporary_function("mac", udtf(Mac(), 
result_types=[DataTypes.MAP(DataTypes.STRING(),DataTypes.STRING())]))



??


INSERT INTO MacTableSink select data['mac'] mac, data['rssi'] rssi, 
data['router'] router, data['range'] distance from KafkaMacTable, lateral 
table(mac(body_data)) as T(data)


??


2021-03-09 17:19:29
org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
    at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)
    at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
    at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)
    at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
    at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
    at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:669)
    at 
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
    at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
    at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
    at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
    at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
    at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
    at 
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at 
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at 
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
    at 
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
    at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at akka.actor.Actor$class.aroundReceiv

Re:Re: flink sql 这种实时计算结果如何与离线计算的结果做数据比对?

2021-03-09 文章 Michael Ran
1.两套逻辑结果,只能定时任务做check2.同一套逻辑,就要具体分析了,只要不是一个人、一套代码逻辑出来的,都有可能出问题
在 2021-03-09 12:51:50,"Smile"  写道:
>对,离线和实时的计算语义本来就是不一样的,所以这个地方也没有特别完美的解决方案,一般都是 case by case 看一下。
>有一些显而易见的问题比如 Join 是否关联成功这种还是比较容易查,其他的确实不太好判断。
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


Re:Re: 频繁发生 'ResourceManager leader changed to new address null' 异常导致任务重启

2021-03-09 文章 Michael Ran
看看当时的负载呢?有没有过高的情况,是什么原因。然后监控下网络和磁盘
在 2021-03-09 14:57:43,"yidan zhao"  写道:
>而且大家推荐怎么设置呢,我可能默认就G1了。不清楚G1是否也需要精调。
>我目前设置的内存还是比较大的。(50G的,100G的TaskManager都有),这么大heap,是否需要特别设置啥呢?
>
>或者是否有必要拆小,比如设置10Gheap,然后把taskmanager数量提上去。
>
>yidan zhao  于2021年3月9日周二 下午2:56写道:
>
>> 好的,我会看下。
>> 然后我今天发现我好多个集群GC collector不一样。
>> 目前发现3种,默认的是G1。flink conf中配置了env.java.opts:
>> "-XX:-OmitStackTraceInFastThrow"的情况出现了2种,一种是Parallel GC with 83
>> threads,还有一种是Mark Sweep Compact GC。
>> 大佬们,Flink是根据内存大小有什么动态调整吗。
>>
>>
>> 不使用G1我大概理解了,可能设置了java.opts这个是覆盖,不是追加。本身我只是希望设置下-XX:-OmitStackTraceInFastThrow而已。
>>
>>
>> 杨杰 <471419...@qq.com> 于2021年3月8日周一 下午3:09写道:
>>
>>> Hi,
>>>
>>>   可以排查下 GC 情况,频繁 FGC 也会导致这些情况。
>>>
>>> Best,
>>> jjiey
>>>
>>> > 2021年3月8日 14:37,yidan zhao  写道:
>>> >
>>> >
>>> 如题,我有个任务频繁发生该异常然后重启。今天任务启动1h后,看了下WEB-UI的检查点也没,restored达到了8已经。然后Exception页面显示该错误,估计大多数都是因为该错误导致的restore。
>>> > 除此外,就是 ‘Job leader for job id eb5d2893c4c6f4034995b9c8e180f01e lost
>>> > leadership’ 错导致任务重启。
>>> >
>>> > 下面给出刚刚的一个错误日志(环境flink1.12,standalone集群,5JM+5TM,JM和TM混部在相同机器):
>>> > 2021-03-08 14:31:40
>>> > org.apache.flink.runtime.io
>>> .network.netty.exception.RemoteTransportException:
>>> > Error at remote task manager '10.35.185.38/10.35.185.38:2016'.
>>> >at org.apache.flink.runtime.io.network.netty.
>>> > CreditBasedPartitionRequestClientHandler.decodeMsg(
>>> > CreditBasedPartitionRequestClientHandler.java:294)
>>> >at org.apache.flink.runtime.io.network.netty.
>>> > CreditBasedPartitionRequestClientHandler.channelRead(
>>> > CreditBasedPartitionRequestClientHandler.java:183)
>>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>>> > AbstractChannelHandlerContext.invokeChannelRead(
>>> > AbstractChannelHandlerContext.java:379)
>>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>>> > AbstractChannelHandlerContext.invokeChannelRead(
>>> > AbstractChannelHandlerContext.java:365)
>>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>>> >
>>> AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext
>>> > .java:357)
>>> >at org.apache.flink.runtime.io.network.netty.
>>> > NettyMessageClientDecoderDelegate.channelRead(
>>> > NettyMessageClientDecoderDelegate.java:115)
>>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>>> > AbstractChannelHandlerContext.invokeChannelRead(
>>> > AbstractChannelHandlerContext.java:379)
>>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>>> > AbstractChannelHandlerContext.invokeChannelRead(
>>> > AbstractChannelHandlerContext.java:365)
>>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>>> >
>>> AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext
>>> > .java:357)
>>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>>> >
>>> DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:
>>> > 1410)
>>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>>> > AbstractChannelHandlerContext.invokeChannelRead(
>>> > AbstractChannelHandlerContext.java:379)
>>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>>> > AbstractChannelHandlerContext.invokeChannelRead(
>>> > AbstractChannelHandlerContext.java:365)
>>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>>> > DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
>>> >at org.apache.flink.shaded.netty4.io.netty.channel.epoll.
>>> > AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(
>>> > AbstractEpollStreamChannel.java:792)
>>> >at
>>> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop
>>> > .processReady(EpollEventLoop.java:475)
>>> >at
>>> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop
>>> > .run(EpollEventLoop.java:378)
>>> >at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
>>> > SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
>>> >at org.apache.flink.shaded.netty4.io.netty.util.internal.
>>> > ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>>> >at java.lang.Thread.run(Thread.java:748)
>>> > Caused by: org.apache.flink.runtime.io.network.partition.
>>> > ProducerFailedException: org.apache.flink.util.FlinkException:
>>> JobManager
>>> > responsible for eb5d2893c4c6f4034995b9c8e180f01e lost the leadership.
>>> >at org.apache.flink.runtime.io.network.netty.PartitionRequestQueue
>>> > .writeAndFlushNextMessageIfPossible(PartitionRequestQueue.java:221)
>>> >at org.apache.flink.runtime.io.network.netty.PartitionRequestQueue
>>> > .enqueueAvailableReader(PartitionRequestQueue.java:108)
>>> >at org.apache.flink.runtime.io.network.netty.PartitionRequestQueue
>>> > .userEventTriggered(PartitionRequestQueue.java:170)
>>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>>> > AbstractChannelHandlerContext.invokeUserEventTriggered(
>>> > AbstractChannelHandlerContext.java:346)
>>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>>> > AbstractChannelHandlerContext.invokeUserEventTrigge