Flink on yarn JDK 版本支持问题

2021-01-23 Thread Jacob
使用Flink1.11.2客户端 往hadoop集群提交job,报错如下:

LogType:jobmanager.err
Log Upload Time:Sat Jan 23 00:06:47 -0800 2021
LogLength:160
Log Contents:
Unrecognized VM option 'MaxMetaspaceSize=268435456'
Error: Could not create the Java Virtual Machine.
Error: A fatal exception has occurred. Program will exit.

请问是因为hadoop集群jdk版本低的问题吗?


现在已知的是hadoop集群jdk版本为1.7


之前一直以为在flink配置文件中配置的*env.java.home*就应该是hadoop集群的java home,通过测试,发现并不是,这个java
home就是客户端(本地)所在机器的java home。这个java版本已经是1.8+,但提交job时,仍然报错,如上。



是因为hadoop集群的jdk低吗?如果升级了hadoop集群的jdk,那么在flink配置文件中的env.java.home
需要改成hadoop集群的java home吗?还是不用改变,依旧使用本地的java home路径?

这两个jdk对于启动一个flink job的作用是什么呀?( 配置的env.java.home和hadoop集群的java home)







-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink 并行度问题

2021-01-23 Thread Jacob
谢谢回复。

你所说的并行度随意设置,我不清楚是什么意思


我在on yarn模式提交job时,比如设置job并行度是25,那么在hadoop yarn portal中看到的VCores Used
就是25,也就是说它就是申请了25个CPU

可是在standalone模式下(3台机器,每台机器4个CPU),那我设置的并行度最大是不是就是12呢?超过这个数量就会报错,资源不够之类的错误提示。  

其实这样三台机器组成的flink集群,往往可申请的可用slot并不是12个,而是小于12个,是不是因为当下CPU剩余量不够,可申请到的slot才达不到12个?






-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Re:Flink 并行度问题

2021-01-23 Thread Jacob
谢谢回复~

在我的理解中,并行度数量不应该超过CPU的数量的。



-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/


?????? ??????????????????????flink state

2021-01-23 Thread ??????



| |
??
|
|
jjw8610...@163.com
|
??


??2021??01??23?? 15:43<25977...@qq.com> ??
TTL??keybykey1state??




--  --
??: ""https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows
> >
> > news_...@163.com 

退订

2021-01-23 Thread 纪军伟


退订
| |
纪军伟
|
|
jjw8610...@163.com
|
签名由网易邮箱大师定制



Re: 根据业务需求选择合适的flink state

2021-01-23 Thread 张锴
*我按照 session
window分组,定义了最大最小状态,使用reduce+processWindowFunction方式重新跑了一下,出来的数据duration(秒),duration_time(时分秒)都是0。下面是我的程序,帮忙分析一下是哪里定义的有问题,为什么会出现这种情况,这个困扰了我好半天。*

case class CloudLiveLogOnLine(
 id: Long,
 courseId: Long,
 customerId: Long,
 courseNumber: Long,
 nickName: String,
 partnerId: Long,
 ip: String,
 reportTime: String,
 liveType: Int,
 uid: String,
 eventTime: Long
   )

case class MinMaxTemp(
   id: Long,
   courseId: Long,
   customerId: Long,
   courseNumber: Long,
   nickName: String,
   partnerId: Long,
   ip: String,
   reportTime: String,
   liveType: Int,
   uid: String,
   mineventTime: Long,
   maxeventTime: Long
 )

object OnLineFlinkTask {

  def main(args: Array[String]): Unit = {

配置省略

 val dataStream: DataStream[CloudLiveLogOnLine] = stream.map(line => {
  var id = 0L
  var courseId = 0L
  var courseNumber = 0L
  var customerId = 0L
  var nickName = ""
  var partnerId = 0L
  var ip = ""
  var reportTime = ""
  var liveType = 0
  var uid = ""
  var eventTime = 0L
  try {
val messageJson = JSON.parseObject(line)
val data: JSONObject = messageJson.getJSONObject("data")
id = data.getLong("id")
courseId = data.getLongValue("courseId")
courseNumber = data.getLongValue("courseNumber")
customerId = data.getLongValue("customerId")
nickName = data.getString("nickName")
partnerId = data.getLongValue("partnerId")
ip = data.getString("ip")
reportTime = data.getString("reportTime")
liveType = data.getIntValue("liveType")
uid = data.getString("uid")
eventTime = messageJson.getLongValue("eventTime")
  } catch {
case e => println(line)
  }
  CloudLiveLogOnLine(id, courseId, customerId, courseNumber,
nickName, partnerId, ip, reportTime, liveType, uid, eventTime)
}).assignAscendingTimestamps(_.eventTime)

// 3. transform 处理数据
val ds = dataStream
  .filter(_.liveType == 1)
  
.map(r=>MinMaxTemp(r.id,r.courseId,r.customerId,r.courseNumber,r.nickName,r.partnerId,
r.ip,r.reportTime,r.liveType,r.uid,r.eventTime,r.eventTime))
  .keyBy(1, 2)
  .window(EventTimeSessionWindows.withGap(Time.minutes(1)))
.reduce(new myReduceFunc(),new AssignWindowProcessFunc())

ds.print()
env.execute("flink job")

   }

}

class myReduceFunc() extends ReduceFunction[MinMaxTemp]{
  override def reduce(value1: MinMaxTemp, value2: MinMaxTemp): MinMaxTemp = {

MinMaxTemp(value1.id,value1.courseId,value1.customerId,value1.courseNumber,value1.nickName,
  
value1.partnerId,value1.ip,value1.reportTime,value1.liveType,value1.uid,value1.mineventTime.min(value2.mineventTime),

value1.maxeventTime.max(value2.maxeventTime))

  }
}

class AssignWindowProcessFunc() extends
ProcessWindowFunction[MinMaxTemp,CloudliveWatcher,Tuple,TimeWindow]{

  private var minTsState: ValueState[Long] = _
  private var maxTsState: ValueState[Long] = _

  override def open(parameters: Configuration): Unit = {
minTsState =getRuntimeContext.getState(new
ValueStateDescriptor[Long]("min-state",classOf[Long]))
maxTsState =getRuntimeContext.getState(new
ValueStateDescriptor[Long]("max-state",classOf[Long]))
  }

  override def process(key: Tuple, context: Context, elements:
Iterable[MinMaxTemp], out: Collector[CloudliveWatcher]): Unit = {
val minTs: Long = minTsState.value() //取出上一个时间戳最小值
val maxTs: Long = maxTsState.value() //取出上一个时间戳最大值

val device_type = 0
val net_opretor = ""
val net_type = ""
val area = ""
val plat_form = ""
val network_operator = ""
val role = 0
val useragent = ""
val currentDate = DateUtil.currentDate
val created_time = currentDate
val modified_time = currentDate
var id =0L
var courseId =0L
var partnerId =0L
var ip =""
var customerId =0L
var courseNumber =0L
var nickName =""
var liveType =0
var uid =""
var eventTime =0L
var min =0L
var max =0L
var join_time =""
var leave_time =""
var duration =0L
var duration_time =""
val iterator: Iterator[MinMaxTemp] = elements.iterator
if (iterator.hasNext) {
  val value: MinMaxTemp = iterator.next()
  id = value.id
  courseId= value.courseId
  partnerId = value.partnerId
  ip = value.ip
  customerId = value.customerId
  courseNumber = value.courseNumber
  nickName = value.nickName

Re: 求一个简单的示例,flink写orc格式文件,对于Map复杂类型的写法。

2021-01-23 Thread Kezhu Wang
https://github.com/apache/iceberg/blob/master/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriters.java#L259


On January 23, 2021 at 13:49:23, 赵一旦 (hinobl...@gmail.com) wrote:

目前通过自定义OrcBulkWriterFactory方式,拿到一个一个的ColumnVector,然后设置值。
对于简单类型,API看起来很清晰,但是Map类型没看懂怎么写。如下,对于serverTime是INT类型,直接设置vector[rowId]即可。那么对于MapColumnVector怎么设置呢,将多个key-value对写进去具体怎么写呢。


serverTimeColumnVector.vector[rowId] = ele.getTimestamp();

MapColumnVector dColumnVector = (MapColumnVector) batch.cols[2];


Re: 根据业务需求选择合适的flink state

2021-01-23 Thread 赵一旦
对scala不是很熟悉,看你写的好复杂,下面是伪代码。
Ele{
   id1;
   id2;
   ts; //timestamp
  minTs = ts;
  maxTs = ts;
}

keyBy(id1,id2).window(EventTimeSessionWindows.withGap(Time.minutes(1))).reduce(xxxReduce,
xxxProcessWindowFunc).

xxxReduce(Ele ele1, Ele ele2){
  return new Ele(ele1.id1, ele1.id2, null, min(ele1.minTs, ele2.minTs),
max(ele1.maxTs, ele2.maxTs) )
}

xxxProcessWindowFunc(Iterable eles){
  Ele ele = eles.iterator().next()
  durationMs = ele.maxTs - ele.minTs;
  collectOut(new XX(ele.id1, ele.id2, ele.minTs, ele.maxTs, duration)) //
-->这代表了id1,id2的一次出现,从minTs开始出现,到maxTs结束,duration为xxx。
}


张锴  于2021年1月23日周六 下午5:35写道:

> *我按照 session
>
> window分组,定义了最大最小状态,使用reduce+processWindowFunction方式重新跑了一下,出来的数据duration(秒),duration_time(时分秒)都是0。下面是我的程序,帮忙分析一下是哪里定义的有问题,为什么会出现这种情况,这个困扰了我好半天。*
>
> case class CloudLiveLogOnLine(
>  id: Long,
>  courseId: Long,
>  customerId: Long,
>  courseNumber: Long,
>  nickName: String,
>  partnerId: Long,
>  ip: String,
>  reportTime: String,
>  liveType: Int,
>  uid: String,
>  eventTime: Long
>)
>
> case class MinMaxTemp(
>id: Long,
>courseId: Long,
>customerId: Long,
>courseNumber: Long,
>nickName: String,
>partnerId: Long,
>ip: String,
>reportTime: String,
>liveType: Int,
>uid: String,
>mineventTime: Long,
>maxeventTime: Long
>  )
>
> object OnLineFlinkTask {
>
>   def main(args: Array[String]): Unit = {
>
> 配置省略
>
>  val dataStream: DataStream[CloudLiveLogOnLine] = stream.map(line => {
>   var id = 0L
>   var courseId = 0L
>   var courseNumber = 0L
>   var customerId = 0L
>   var nickName = ""
>   var partnerId = 0L
>   var ip = ""
>   var reportTime = ""
>   var liveType = 0
>   var uid = ""
>   var eventTime = 0L
>   try {
> val messageJson = JSON.parseObject(line)
> val data: JSONObject = messageJson.getJSONObject("data")
> id = data.getLong("id")
> courseId = data.getLongValue("courseId")
> courseNumber = data.getLongValue("courseNumber")
> customerId = data.getLongValue("customerId")
> nickName = data.getString("nickName")
> partnerId = data.getLongValue("partnerId")
> ip = data.getString("ip")
> reportTime = data.getString("reportTime")
> liveType = data.getIntValue("liveType")
> uid = data.getString("uid")
> eventTime = messageJson.getLongValue("eventTime")
>   } catch {
> case e => println(line)
>   }
>   CloudLiveLogOnLine(id, courseId, customerId, courseNumber,
> nickName, partnerId, ip, reportTime, liveType, uid, eventTime)
> }).assignAscendingTimestamps(_.eventTime)
>
> // 3. transform 处理数据
> val ds = dataStream
>   .filter(_.liveType == 1)
>   .map(r=>MinMaxTemp(r.id
> ,r.courseId,r.customerId,r.courseNumber,r.nickName,r.partnerId,
> r.ip,r.reportTime,r.liveType,r.uid,r.eventTime,r.eventTime))
>   .keyBy(1, 2)
>   .window(EventTimeSessionWindows.withGap(Time.minutes(1)))
> .reduce(new myReduceFunc(),new AssignWindowProcessFunc())
>
> ds.print()
> env.execute("flink job")
>
>}
>
> }
>
> class myReduceFunc() extends ReduceFunction[MinMaxTemp]{
>   override def reduce(value1: MinMaxTemp, value2: MinMaxTemp): MinMaxTemp
> = {
> MinMaxTemp(value1.id
> ,value1.courseId,value1.customerId,value1.courseNumber,value1.nickName,
>
> value1.partnerId,value1.ip,value1.reportTime,value1.liveType,value1.uid,value1.mineventTime.min(value2.mineventTime),
>
> value1.maxeventTime.max(value2.maxeventTime))
>
>   }
> }
>
> class AssignWindowProcessFunc() extends
> ProcessWindowFunction[MinMaxTemp,CloudliveWatcher,Tuple,TimeWindow]{
>
>   private var minTsState: ValueState[Long] = _
>   private var maxTsState: ValueState[Long] = _
>
>   override def open(parameters: Configuration): Unit = {
> minTsState =getRuntimeContext.getState(new
> ValueStateDescriptor[Long]("min-state",classOf[Long]))
> maxTsState =getRuntimeContext.getState(new
> ValueStateDescriptor[Long]("max-state",classOf[Long]))
>   }
>
>   override def process(key: Tuple, context: Context, elements:
> Iterable[MinMaxTemp], out: Collector[CloudliveWatcher]): Unit = {
> val minTs: Long = minTsState.value() //取出上一个时间戳最小值
> val maxTs: Long = maxTsState.value() //取出上一个时间戳最大值
>
> val device_type = 0
> val net_opretor = ""
> val net_type = ""
> val area = ""
> val plat_form = ""
> val

Re: 求一个简单的示例,flink写orc格式文件,对于Map复杂类型的写法。

2021-01-23 Thread 赵一旦
非常感谢,这个封装真好,直接整个类都可以拿来用了。

Kezhu Wang  于2021年1月23日周六 下午6:00写道:

>
> https://github.com/apache/iceberg/blob/master/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriters.java#L259
>
>
> On January 23, 2021 at 13:49:23, 赵一旦 (hinobl...@gmail.com) wrote:
>
> 目前通过自定义OrcBulkWriterFactory方式,拿到一个一个的ColumnVector,然后设置值。
>
> 对于简单类型,API看起来很清晰,但是Map类型没看懂怎么写。如下,对于serverTime是INT类型,直接设置vector[rowId]即可。那么对于MapColumnVector怎么设置呢,将多个key-value对写进去具体怎么写呢。
>
>
> serverTimeColumnVector.vector[rowId] = ele.getTimestamp();
>
> MapColumnVector dColumnVector = (MapColumnVector) batch.cols[2];
>


Re: Re:Flink 并行度问题

2021-01-23 Thread 赵一旦
按照你的理解,别说服务器了,你自己的笔记本电脑都别工作了。你电脑才几核,开机后进程数量就超过CPU数了,这些进程难道不工作了?

多线程、多线程,不是说N核的就只有N个线程呀。

你standalone,每个机器想设置多少个slot都没问题,和你的机器CPU没关系懂了嘛。“没关系”。

Jacob <17691150...@163.com> 于2021年1月23日周六 下午4:35写道:

> 谢谢回复~
>
> 在我的理解中,并行度数量不应该超过CPU的数量的。
>
>
>
> -
> Thanks!
> Jacob
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: Re:Flink 并行度问题

2021-01-23 Thread 赵一旦
on yarn是另一回事情,yarn自身有自身对资源的衡量方式,就是vcore。
你设置你的作业25并行,默认yarn可能就是按照1个并发对应1个vcore,就这么个意思。肯定有参数可以调整的,你可以让1个并发对应10个vcore都没问题。
就是一种衡量方式而已。

赵一旦  于2021年1月23日周六 下午8:20写道:

> 按照你的理解,别说服务器了,你自己的笔记本电脑都别工作了。你电脑才几核,开机后进程数量就超过CPU数了,这些进程难道不工作了?
>
> 多线程、多线程,不是说N核的就只有N个线程呀。
>
> 你standalone,每个机器想设置多少个slot都没问题,和你的机器CPU没关系懂了嘛。“没关系”。
>
> Jacob <17691150...@163.com> 于2021年1月23日周六 下午4:35写道:
>
>> 谢谢回复~
>>
>> 在我的理解中,并行度数量不应该超过CPU的数量的。
>>
>>
>>
>> -
>> Thanks!
>> Jacob
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>>
>


Re: Re:Flink 并行度问题

2021-01-23 Thread Jacob
我明白了。。。感谢!!(最近很容易陷入这些低级的错误认知中)。越想越觉得这个问题很傻   23



-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 退订

2021-01-23 Thread Shawn Huang
Hi,

退订需要发邮件到  user-zh-unsubscr...@flink.apache.org

可以参考 https://flink.apache.org/zh/community.html#section-1

Best,
Shawn Huang


Natasha <13631230...@163.com> 于2021年1月22日周五 下午5:04写道:

>
>
> 退订


FlinkSQL1.12查询hive表很快finished;No more splits available

2021-01-23 Thread 赵一旦
SQL很简单,如下:

select * from test2
where `dt`=20210124 and `hour`=02 and `supply_id`=2027
limit 1000;

提交之后很快就finished,并且没有查询到任何数据。

但实际是有数据的,通过spark-sql确认过相同的语句可以查询到数据。

看了JM和TM的日志中有No more splits available。

目前来看貌似一个split也没有。这个应该是1.12的新sourceAPI。

不清楚是不是bug,还是有什么使用注意点呢?


Re: FlinkSQL1.12查询hive表很快finished;No more splits available

2021-01-23 Thread 赵一旦
补充(1)FlinkSQL的查询,对于分区字符串字段貌似必须加'',不加就查询不到?如上hour=02这种直接导致no more split。
其次(2)去除这部分问题后,目前可以发现有split了,但是报了orc相关的错误。并且提交SQL会导致JM直接失败。JM日志如下:

2021-01-24 04:41:24,952 ERROR
org.apache.flink.runtime.util.FatalExitExceptionHandler  [] -
FATAL: Thread 'flink-akka.actor.default-dispatcher-2' produced an
uncaught exception. Stopping the process...

java.util.concurrent.CompletionException:
org.apache.flink.util.FlinkRuntimeException: Failed to start the
operator coordinators
at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
~[?:1.8.0_251]
at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
~[?:1.8.0_251]
at 
java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:722)
~[?:1.8.0_251]
at 
java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:731)
~[?:1.8.0_251]
at 
java.util.concurrent.CompletableFuture.thenRun(CompletableFuture.java:2023)
~[?:1.8.0_251]
at 
org.apache.flink.runtime.jobmaster.JobMaster.resetAndStartScheduler(JobMaster.java:935)
~[flink-dists-extended_2.11-1.12.0.jar:?]
at 
org.apache.flink.runtime.jobmaster.JobMaster.startJobExecution(JobMaster.java:801)
~[flink-dists-extended_2.11-1.12.0.jar:?]
at 
org.apache.flink.runtime.jobmaster.JobMaster.lambda$start$1(JobMaster.java:357)
~[flink-dists-extended_2.11-1.12.0.jar:?]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleCallAsync(AkkaRpcActor.java:383)
~[flink-dists-extended_2.11-1.12.0.jar:?]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
~[flink-dists-extended_2.11-1.12.0.jar:?]
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:88)
~[flink-dists-extended_2.11-1.12.0.jar:?]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
~[flink-dists-extended_2.11-1.12.0.jar:?]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
[flink-dists-extended_2.11-1.12.0.jar:?]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
[flink-dists-extended_2.11-1.12.0.jar:?]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
[flink-dists-extended_2.11-1.12.0.jar:?]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
[flink-dists-extended_2.11-1.12.0.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
[flink-dists-extended_2.11-1.12.0.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
[flink-dists-extended_2.11-1.12.0.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
[flink-dists-extended_2.11-1.12.0.jar:?]
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
[flink-dists-extended_2.11-1.12.0.jar:?]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
[flink-dists-extended_2.11-1.12.0.jar:?]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
[flink-dists-extended_2.11-1.12.0.jar:?]
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
[flink-dists-extended_2.11-1.12.0.jar:?]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
[flink-dists-extended_2.11-1.12.0.jar:?]
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
[flink-dists-extended_2.11-1.12.0.jar:?]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
[flink-dists-extended_2.11-1.12.0.jar:?]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
[flink-dists-extended_2.11-1.12.0.jar:?]
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
[flink-dists-extended_2.11-1.12.0.jar:?]
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
[flink-dists-extended_2.11-1.12.0.jar:?]
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[flink-dists-extended_2.11-1.12.0.jar:?]
Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to
start the operator coordinators
at 
org.apache.flink.runtime.scheduler.SchedulerBase.startAllOperatorCoordinators(SchedulerBase.java:1100)
~[flink-dists-extended_2.11-1.12.0.jar:?]
at 
org.apache.flink.runtime.scheduler.SchedulerBase.startScheduling(SchedulerBase.java:567)
~[flink-dists-extended_2.11-1.12.0.jar:?]
at 
org.apache.flink.runtime.jobmaster.JobMaster.startScheduling(JobMaster.java:944)
~[flink-dists-extended_2.11-1.12.0.jar:?]
at 
java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:719)
~[?:1.8.0_251]
... 27 more
Caused by: java.lang.ArrayIndexOutOfBoundsException: 1
at 
org.apache.hadoop.hive.common.ValidReadTxnList.readFromString(ValidReadTxnList.java:142)
~[?:?]
at 
org.apache.hadoop.hive.common.ValidReadTxnList.(ValidReadTxnList.java:57)
~[?:?]
at 
org.apache.hadoop.hiv

Re: FlinkSQL1.12查询hive表很快finished;No more splits available

2021-01-23 Thread 赵一旦
此外,还有我发现Parquet格式是可以的,顺便看了下FlinkStreamConnector种,FileSink的ParquetBulkFomart。
然后文档讲到ParquetAvroWriters,这种格式写的文件对应hive表怎么创建?貌似默认stored as
parquet的话,不带任何avro的信息呀。

赵一旦  于2021年1月24日周日 上午6:45写道:

> 补充(1)FlinkSQL的查询,对于分区字符串字段貌似必须加'',不加就查询不到?如上hour=02这种直接导致no more split。
> 其次(2)去除这部分问题后,目前可以发现有split了,但是报了orc相关的错误。并且提交SQL会导致JM直接失败。JM日志如下:
>
> 2021-01-24 04:41:24,952 ERROR 
> org.apache.flink.runtime.util.FatalExitExceptionHandler  [] - FATAL: 
> Thread 'flink-akka.actor.default-dispatcher-2' produced an uncaught 
> exception. Stopping the process...
>
> java.util.concurrent.CompletionException: 
> org.apache.flink.util.FlinkRuntimeException: Failed to start the operator 
> coordinators
> at 
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
>  ~[?:1.8.0_251]
> at 
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
>  ~[?:1.8.0_251]
> at 
> java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:722) 
> ~[?:1.8.0_251]
> at 
> java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:731)
>  ~[?:1.8.0_251]
> at 
> java.util.concurrent.CompletableFuture.thenRun(CompletableFuture.java:2023) 
> ~[?:1.8.0_251]
> at 
> org.apache.flink.runtime.jobmaster.JobMaster.resetAndStartScheduler(JobMaster.java:935)
>  ~[flink-dists-extended_2.11-1.12.0.jar:?]
> at 
> org.apache.flink.runtime.jobmaster.JobMaster.startJobExecution(JobMaster.java:801)
>  ~[flink-dists-extended_2.11-1.12.0.jar:?]
> at 
> org.apache.flink.runtime.jobmaster.JobMaster.lambda$start$1(JobMaster.java:357)
>  ~[flink-dists-extended_2.11-1.12.0.jar:?]
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleCallAsync(AkkaRpcActor.java:383)
>  ~[flink-dists-extended_2.11-1.12.0.jar:?]
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
>  ~[flink-dists-extended_2.11-1.12.0.jar:?]
> at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:88)
>  ~[flink-dists-extended_2.11-1.12.0.jar:?]
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
>  ~[flink-dists-extended_2.11-1.12.0.jar:?]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) 
> [flink-dists-extended_2.11-1.12.0.jar:?]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) 
> [flink-dists-extended_2.11-1.12.0.jar:?]
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) 
> [flink-dists-extended_2.11-1.12.0.jar:?]
> at 
> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) 
> [flink-dists-extended_2.11-1.12.0.jar:?]
> at 
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) 
> [flink-dists-extended_2.11-1.12.0.jar:?]
> at 
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
> [flink-dists-extended_2.11-1.12.0.jar:?]
> at 
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
> [flink-dists-extended_2.11-1.12.0.jar:?]
> at akka.actor.Actor$class.aroundReceive(Actor.scala:517) 
> [flink-dists-extended_2.11-1.12.0.jar:?]
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) 
> [flink-dists-extended_2.11-1.12.0.jar:?]
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 
> [flink-dists-extended_2.11-1.12.0.jar:?]
> at akka.actor.ActorCell.invoke(ActorCell.scala:561) 
> [flink-dists-extended_2.11-1.12.0.jar:?]
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) 
> [flink-dists-extended_2.11-1.12.0.jar:?]
> at akka.dispatch.Mailbox.run(Mailbox.scala:225) 
> [flink-dists-extended_2.11-1.12.0.jar:?]
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235) 
> [flink-dists-extended_2.11-1.12.0.jar:?]
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
> [flink-dists-extended_2.11-1.12.0.jar:?]
> at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
> [flink-dists-extended_2.11-1.12.0.jar:?]
> at 
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
> [flink-dists-extended_2.11-1.12.0.jar:?]
> at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>  [flink-dists-extended_2.11-1.12.0.jar:?]
> Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to start the 
> operator coordinators
> at 
> org.apache.flink.runtime.scheduler.SchedulerBase.startAllOperatorCoordinators(SchedulerBase.java:1100)
>  ~[flink-dists-extended_2.11-1.12.0.jar:?]
> at 
> org.apache.flink.runtime.scheduler.SchedulerBase.startScheduling(SchedulerBase.java:567)
>  ~[flink-dists-extended_2.11-1.12.0.jar:?]
> at 
> org.apache.flink.runtime.jobmaster.JobMaster.startScheduling(JobMaster.java:944)
>  ~[flink-dists-ext

FlinkSQL读取orc表,SQL提交就会导致JobManager进程报错并退出。

2021-01-23 Thread 赵一旦
大佬门帮忙分析下,从日志以及实验过程来看,和ORC有关。

SQL很简单,随意一个select就可以。
异常日志如下:

2021-01-24 04:41:24,952 ERROR
org.apache.flink.runtime.util.FatalExitExceptionHandler  [] -
FATAL: Thread 'flink-akka.actor.default-dispatcher-2' produced an
uncaught exception. Stopping the process...

java.util.concurrent.CompletionException:
org.apache.flink.util.FlinkRuntimeException: Failed to start the
operator coordinators
at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
~[?:1.8.0_251]
at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
~[?:1.8.0_251]
at 
java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:722)
~[?:1.8.0_251]
at 
java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:731)
~[?:1.8.0_251]
at 
java.util.concurrent.CompletableFuture.thenRun(CompletableFuture.java:2023)
~[?:1.8.0_251]
at 
org.apache.flink.runtime.jobmaster.JobMaster.resetAndStartScheduler(JobMaster.java:935)
~[flink-dists-extended_2.11-1.12.0.jar:?]
at 
org.apache.flink.runtime.jobmaster.JobMaster.startJobExecution(JobMaster.java:801)
~[flink-dists-extended_2.11-1.12.0.jar:?]
at 
org.apache.flink.runtime.jobmaster.JobMaster.lambda$start$1(JobMaster.java:357)
~[flink-dists-extended_2.11-1.12.0.jar:?]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleCallAsync(AkkaRpcActor.java:383)
~[flink-dists-extended_2.11-1.12.0.jar:?]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
~[flink-dists-extended_2.11-1.12.0.jar:?]
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:88)
~[flink-dists-extended_2.11-1.12.0.jar:?]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
~[flink-dists-extended_2.11-1.12.0.jar:?]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
[flink-dists-extended_2.11-1.12.0.jar:?]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
[flink-dists-extended_2.11-1.12.0.jar:?]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
[flink-dists-extended_2.11-1.12.0.jar:?]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
[flink-dists-extended_2.11-1.12.0.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
[flink-dists-extended_2.11-1.12.0.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
[flink-dists-extended_2.11-1.12.0.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
[flink-dists-extended_2.11-1.12.0.jar:?]
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
[flink-dists-extended_2.11-1.12.0.jar:?]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
[flink-dists-extended_2.11-1.12.0.jar:?]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
[flink-dists-extended_2.11-1.12.0.jar:?]
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
[flink-dists-extended_2.11-1.12.0.jar:?]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
[flink-dists-extended_2.11-1.12.0.jar:?]
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
[flink-dists-extended_2.11-1.12.0.jar:?]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
[flink-dists-extended_2.11-1.12.0.jar:?]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
[flink-dists-extended_2.11-1.12.0.jar:?]
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
[flink-dists-extended_2.11-1.12.0.jar:?]
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
[flink-dists-extended_2.11-1.12.0.jar:?]
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[flink-dists-extended_2.11-1.12.0.jar:?]
Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to
start the operator coordinators
at 
org.apache.flink.runtime.scheduler.SchedulerBase.startAllOperatorCoordinators(SchedulerBase.java:1100)
~[flink-dists-extended_2.11-1.12.0.jar:?]
at 
org.apache.flink.runtime.scheduler.SchedulerBase.startScheduling(SchedulerBase.java:567)
~[flink-dists-extended_2.11-1.12.0.jar:?]
at 
org.apache.flink.runtime.jobmaster.JobMaster.startScheduling(JobMaster.java:944)
~[flink-dists-extended_2.11-1.12.0.jar:?]
at 
java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:719)
~[?:1.8.0_251]
... 27 more
Caused by: java.lang.ArrayIndexOutOfBoundsException: 1
at 
org.apache.hadoop.hive.common.ValidReadTxnList.readFromString(ValidReadTxnList.java:142)
~[?:?]
at 
org.apache.hadoop.hive.common.ValidReadTxnList.(ValidReadTxnList.java:57)
~[?:?]
at 
org.apache.hadoop.hive.ql.io.orc.OrcInputFormat$Context.(OrcInputFormat.java:421)
~[?:?]
at

Caused by: java.lang.NoSuchMethodError: com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;)V

2021-01-23 Thread 董海峰(Sharp)
Hi,您好啊,我最近遇到一个问题,在社区里发过,但是没人回答,想请教您一下,烦请有空的时候回复一下,谢谢您啦。
hadoop3.3.0 flink1.12 hive3.12
I want to integrate hive and flink. After I configure the 
sql-client-dqfaults.yaml file,
catalogs:
   - name: default_catalog
 type: hive
 hive-conf-dir: /cdc/apache-hive-3.1.2-bin/conf

I start the flink sql client, but the following error is reported.
[root@dhf4 bin]# ./sql-client.sh embedded
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in 
[jar:file:/cdc/flink-1.12.0/lib/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in 
[jar:file:/cdc/hadoop-3.3.0/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
No default environment specified.
Searching for '/cdc/flink-1.12.0/conf/sql-client-defaults.yaml'...found.
Reading default environment from: 
file:/cdc/flink-1.12.0/conf/sql-client-defaults.yaml
No session environment specified.
2021-01-20 10:12:38,179 INFO  org.apache.hadoop.hive.conf.HiveConf  
   [] - Found configuration file 
file:/cdc/apache-hive-3.1.2-bin/conf/hive-site.xml
Exception in thread "main" org.apache.flink.table.client.SqlClientException: 
Unexpected exception. This is a bug. Please consider filing an issue.
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:208)
Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: Could 
not create execution context.
at 
org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:878)
at 
org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:226)
at org.apache.flink.table.client.SqlClient.start(SqlClient.java:108)
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:196)
Caused by: java.lang.NoSuchMethodError: 
com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;)V
at org.apache.hadoop.conf.Configuration.set(Configuration.java:1380)
at org.apache.hadoop.conf.Configuration.set(Configuration.java:1361)
at org.apache.hadoop.mapred.JobConf.setJar(JobConf.java:536)
at org.apache.hadoop.mapred.JobConf.setJarByClass(JobConf.java:554)
at org.apache.hadoop.mapred.JobConf.(JobConf.java:448)
at org.apache.hadoop.hive.conf.HiveConf.initialize(HiveConf.java:5141)
at org.apache.hadoop.hive.conf.HiveConf.(HiveConf.java:5109)
at 
org.apache.flink.table.catalog.hive.HiveCatalog.createHiveConf(HiveCatalog.java:211)
at 
org.apache.flink.table.catalog.hive.HiveCatalog.(HiveCatalog.java:164)
at 
org.apache.flink.table.catalog.hive.factories.HiveCatalogFactory.createCatalog(HiveCatalogFactory.java:89)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.createCatalog(ExecutionContext.java:384)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$null$5(ExecutionContext.java:634)
at java.util.HashMap.forEach(HashMap.java:1289)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$6(ExecutionContext.java:633)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:266)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:632)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:529)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:185)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:138)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:867)
... 3 more

The log content is as follows
[root@dhf4 bin]# cat ../log/flink-root-sql-client-dhf4.log
2021-01-20 10:12:36,246 INFO  
org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
configuration property: jobmanager.rpc.address, localhost
2021-01-20 10:12:36,252 INFO  
org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
configuration property: jobmanager.rpc.port, 6123
2021-01-20 10:12:36,252 INFO  
org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
configuration property: jobmanager.memory.process.size, 1600m
2021-01-20 10:12:36,252 INFO  
org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
configuration property: taskmanager.memory.process.size, 1728m
2021-01-20 10:12:36,252 INFO  
org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
configuration property: taskmanager.numberOfTaskSlots, 1
2021-01-20 10:12:36,256 INFO  
org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
configuration property: parallelism.default, 1
2021-01-20

pyflink 1.12.1 没有 python 3.8 安装文件

2021-01-23 Thread macdoor
在 Linux python 3.8上无法安装 pyflink 1.12.1 ,最高是 1.12.0,查看可以提供的安装文件
https://pypi.org/project/apache-flink/#files 中,python 3.8 只有一个安装文件
apache_flink-1.12.1-cp38-cp38-macosx_10_9_x86_64.whl 。

而 pyflink 1.12.0 的 python 3.8 有 2个安装文件
apache_flink-1.12.0-cp38-cp38-manylinux1_x86_64.whl 和
apache_flink-1.12.0-cp38-cp38-macosx_10_9_x86_64.whl 。

pyflink 1.12.1 安装文件不全吗?




--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: pyflink 1.12.1 没有 python 3.8 安装文件

2021-01-23 Thread Xintong Song
目前是不全的。Flink 在 PyPI 上的项目存储空间满了,导致 Python 3.8 Linux 的包上传 不上去。目前已经联系 PyPI
对项目空间扩容,但是 PyPI 的审核流程比较慢还没有完成扩容。

1.12.1 release notes [1] 里面也有提到。

>
>- The source and python 3.8 linux wheel packages for Apache Flink
>1.12.1 are temporary missing on PyPI
>, due to the project space
>limit. The request for increasing the space limit is currently under the
>PyPI review process. During this time, you can build the package
>manually
>
> 
>  if
>needed.
>
>
Thank you~

Xintong Song


[1] https://flink.apache.org/news/2021/01/19/release-1.12.1.html



On Sun, Jan 24, 2021 at 9:12 AM macdoor  wrote:

> 在 Linux python 3.8上无法安装 pyflink 1.12.1 ,最高是 1.12.0,查看可以提供的安装文件
> https://pypi.org/project/apache-flink/#files 中,python 3.8 只有一个安装文件
> apache_flink-1.12.1-cp38-cp38-macosx_10_9_x86_64.whl 。
>
> 而 pyflink 1.12.0 的 python 3.8 有 2个安装文件
> apache_flink-1.12.0-cp38-cp38-manylinux1_x86_64.whl 和
> apache_flink-1.12.0-cp38-cp38-macosx_10_9_x86_64.whl 。
>
> pyflink 1.12.1 安装文件不全吗?
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Flink On Yarn部署模式下,yarn能否实现自定义日志聚合策略而不是作业结束后才聚合。

2021-01-23 Thread Bobby
在Flink On
Yarn部署模式下,发现只有当作业终止后,yarn才会对各个tm和jm的日志进行聚合放到hdfs里。这对实际生产查日志解决问题非常不方便。
有没有可以自定义日志聚合策略,如每间隔多久就聚合一次放到yarn里这种实现方式。
亦或者对Flink程序各位大佬在日常使用中是如何做到实时查询日志的。
感谢。





--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: pyflink 1.12.1 没有 python 3.8 安装文件

2021-01-23 Thread macdoor
谢谢!不好意思没有仔细读文档,现在哪里能下载build 好的 Linux 下的 Python 3.8 的 pyflink 1.12.1
吗?觉得自己build的还是不放心



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: pyflink 1.12.1 没有 python 3.8 安装文件

2021-01-23 Thread Xintong Song
Apache 官方也能下载到。

https://dist.apache.org/repos/dist/release/flink/flink-1.12.1/python/


Thank you~

Xintong Song


On Sun, Jan 24, 2021 at 11:39 AM macdoor  wrote:

> 谢谢!不好意思没有仔细读文档,现在哪里能下载build 好的 Linux 下的 Python 3.8 的 pyflink 1.12.1
> 吗?觉得自己build的还是不放心
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/