Re: FlinkSQL ES7连接器无法使用

2021-11-22 文章 Leonard Xu
这是个依赖问题,你检查下你环境中是否只使用sql connector 的jar,即 flink-sql-connector-elasticsearch7, 
如果不是 datastream 作业是不需要 flink-connector-elasticsearch7 这个 
jar包的。如果不是这个问题,你可以分析下你作业里使用的 es 相关依赖,可以参考异常栈确定类再去确定jar包,看下是不是多加了一些无用的jar。

祝好,
Leonard
 

> 在 2021年11月22日,12:30,mispower  写道:
> 
> 你好,咨询一下后续你这个问题是如何解决的?
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> At 2021-06-10 10:15:58, "mokaful" <649713...@qq.com> wrote:
>> org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot
>> instantiate user function.
>>  at
>> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:338)
>> ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>>  at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:653)
>> ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>>  at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:626)
>> ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>>  at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:566)
>> ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>>  at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:616)
>> ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>>  at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:566)
>> ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>>  at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:616)
>> ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>>  at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:566)
>> ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>>  at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:616)
>> ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>>  at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:566)
>> ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>>  at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:181)
>> ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>>  at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:548)
>> ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>>  at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
>> ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>>  at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537)
>> ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
>> ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
>> ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>>  at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_181]
>> Caused by: java.io.InvalidClassException:
>> org.apache.flink.streaming.connectors.elasticsearch.table.Elasticsearch7DynamicSink$AuthRestClientFactory;
>> local class incompatible: stream classdesc serialVersionUID =
>> -2564582543942331131, local class serialVersionUID = -2353232579685349916
>>  at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)
>> ~[?:1.8.0_181]
>>  at 
>> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885)
>> ~[?:1.8.0_181]
>>  at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
>> ~[?:1.8.0_181]
>>  at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042)
>> ~[?:1.8.0_181]
>>  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>> ~[?:1.8.0_181]
>>  at 
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
>> ~[?:1.8.0_181]
>>  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
>> ~[?:1.8.0_181]
>>  at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
>> ~[?:1.8.0_181]
>>  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>> ~[?:1.8.0_181]
>>  at 
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
>> ~[?:1.8.0_181]
>>  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
>> ~[?:1.8.0_181]
>>  at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
>> ~[?:1.8.0_181]
>>  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>> ~[?:1.8.0_181]
>>  at 
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
>> ~[?:1.8.0_181]
>>  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
>> ~[?:1.8.0_181]
>>  at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
>> ~[?:1.8.0_181]
>>  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>> ~[?:1.8.0_181]
>>  at 
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
>> ~[?:1.8.0_181]
>>

Re: 如何实现event triggered window?

2021-11-22 文章 Tony Wei
Hi Pinjie,

如果是需要 event triggered 的累計統計更新的話,可以考慮使用 SQL over aggregation
[1]。例如文件中提供的如下範例,計算當前 row  往前一小時內的加總結果。

> SELECT order_id, order_time, amount,
>   SUM(amount) OVER (
> PARTITION BY product
> ORDER BY order_time
> RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
>   ) AS one_hour_prod_amount_sumFROM Orders
>
> 但是這種作法只能根據收到的事件來觸發,無法根據處理時間。換句話說,如果 t=X 沒有數據進來的話,就不會有 t=(X-1) ~ X 的累計統計輸出。
考慮更複雜的情況需要結合事件和處理時間來觸發的話,需要透過 Process Function API 或者用 DataStream API 自定義
Trigger 的方式實現。

best regards,

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/over-agg/

tison  於 2021年11月23日 週二 下午2:03寫道:

> 如果你是想每时每刻(实际上开销很大,假设是每 1 分钟),那就用 Sliding Window
>
> Best,
> tison.
>
>
> tison  于2021年11月23日周二 下午2:00写道:
>
> > 你的理解里就是必须整点对齐嘛,那其实是可以加个 offset 不整点对齐的捏。
> >
> > Best,
> > tison.
> >
> >
> > tison  于2021年11月23日周二 下午1:59写道:
> >
> >>
> >>
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/window-tvf/
> >>
> >> 你可以关注一下这里有个 offset 参数,这样就能用 tumbling window 了 =。=
> >>
> >> Best,
> >> tison.
> >>
> >>
> >> Pinjie Huang  于2021年11月23日周二
> 下午1:18写道:
> >>
> >>> Hi Yidan,
> >>>
> >>> Tumbling window 只有
> >>> t=0~1h
> >>> t=1~2h
> >>> 等等的window
> >>>
> >>> 我现在需要在 t=X 时刻,得到 t=(X-1) ~ X 的window 比如
> >>> t=1.5h 时刻 我需要 t=0.5~1.5h 这个window
> >>>
> >>> On Tue, Nov 23, 2021 at 12:32 PM yidan zhao 
> wrote:
> >>>
> >>> > 其实问题还是没描述清楚,所以遇到问题究竟是啥,没发现tumbling window不行。
> >>> >
> >>> > zhiyuan su  于2021年11月22日周一 下午4:59写道:
> >>> >
> >>> > > 感觉你的场景是近实时累计统计,你可以参考下window TVF 看下是否满足
> >>> > >
> >>> > >
> >>> >
> >>>
> https://flink-learning.org.cn/article/detail/a8b0895d4271bf6b770927eea214612d?tab=SQL&page=1
> >>> > > 具体在第2章第一节
> >>> > >
> >>> > > Pinjie Huang  于2021年11月22日周一
> >>> > 下午3:52写道:
> >>> > >
> >>> > > > Hi friends,
> >>> > > >
> >>> > > > Flink 自带的window 有tumlbing sliding 和 session 但是似乎没有event
> triggerred。
> >>> > > >
> >>> > > > 比如说 想知道过去1小时event A trigger的次数,
> >>> > > >
> >>> > > > 如果使用tumbling window和1h window
> >>> > > > |1h | 1h |
> >>> > > > t=0
> >>> > > > 在t=1.5h时刻,读取数据,是t=1h 时刻过去一小时的数据,而不是实时的。
> >>> > > >
> >>> > > > 使用sliding window 的话需要define非常小的slide,而且依旧可能有延迟。
> >>> > > >
> >>> > > > 如果想知道实时的数据,需要基于event来更新state,如何实现event triggered window?
> >>> > > >
> >>> > >
> >>> >
> >>>
> >>>
> >>> --
> >>> Thanks,
> >>> Pinjie Huang
> >>>
> >>
>


Re: 如何实现event triggered window?

2021-11-22 文章 tison
如果你是想每时每刻(实际上开销很大,假设是每 1 分钟),那就用 Sliding Window

Best,
tison.


tison  于2021年11月23日周二 下午2:00写道:

> 你的理解里就是必须整点对齐嘛,那其实是可以加个 offset 不整点对齐的捏。
>
> Best,
> tison.
>
>
> tison  于2021年11月23日周二 下午1:59写道:
>
>>
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/window-tvf/
>>
>> 你可以关注一下这里有个 offset 参数,这样就能用 tumbling window 了 =。=
>>
>> Best,
>> tison.
>>
>>
>> Pinjie Huang  于2021年11月23日周二 下午1:18写道:
>>
>>> Hi Yidan,
>>>
>>> Tumbling window 只有
>>> t=0~1h
>>> t=1~2h
>>> 等等的window
>>>
>>> 我现在需要在 t=X 时刻,得到 t=(X-1) ~ X 的window 比如
>>> t=1.5h 时刻 我需要 t=0.5~1.5h 这个window
>>>
>>> On Tue, Nov 23, 2021 at 12:32 PM yidan zhao  wrote:
>>>
>>> > 其实问题还是没描述清楚,所以遇到问题究竟是啥,没发现tumbling window不行。
>>> >
>>> > zhiyuan su  于2021年11月22日周一 下午4:59写道:
>>> >
>>> > > 感觉你的场景是近实时累计统计,你可以参考下window TVF 看下是否满足
>>> > >
>>> > >
>>> >
>>> https://flink-learning.org.cn/article/detail/a8b0895d4271bf6b770927eea214612d?tab=SQL&page=1
>>> > > 具体在第2章第一节
>>> > >
>>> > > Pinjie Huang  于2021年11月22日周一
>>> > 下午3:52写道:
>>> > >
>>> > > > Hi friends,
>>> > > >
>>> > > > Flink 自带的window 有tumlbing sliding 和 session 但是似乎没有event triggerred。
>>> > > >
>>> > > > 比如说 想知道过去1小时event A trigger的次数,
>>> > > >
>>> > > > 如果使用tumbling window和1h window
>>> > > > |1h | 1h |
>>> > > > t=0
>>> > > > 在t=1.5h时刻,读取数据,是t=1h 时刻过去一小时的数据,而不是实时的。
>>> > > >
>>> > > > 使用sliding window 的话需要define非常小的slide,而且依旧可能有延迟。
>>> > > >
>>> > > > 如果想知道实时的数据,需要基于event来更新state,如何实现event triggered window?
>>> > > >
>>> > >
>>> >
>>>
>>>
>>> --
>>> Thanks,
>>> Pinjie Huang
>>>
>>


Re: 如何实现event triggered window?

2021-11-22 文章 tison
你的理解里就是必须整点对齐嘛,那其实是可以加个 offset 不整点对齐的捏。

Best,
tison.


tison  于2021年11月23日周二 下午1:59写道:

>
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/window-tvf/
>
> 你可以关注一下这里有个 offset 参数,这样就能用 tumbling window 了 =。=
>
> Best,
> tison.
>
>
> Pinjie Huang  于2021年11月23日周二 下午1:18写道:
>
>> Hi Yidan,
>>
>> Tumbling window 只有
>> t=0~1h
>> t=1~2h
>> 等等的window
>>
>> 我现在需要在 t=X 时刻,得到 t=(X-1) ~ X 的window 比如
>> t=1.5h 时刻 我需要 t=0.5~1.5h 这个window
>>
>> On Tue, Nov 23, 2021 at 12:32 PM yidan zhao  wrote:
>>
>> > 其实问题还是没描述清楚,所以遇到问题究竟是啥,没发现tumbling window不行。
>> >
>> > zhiyuan su  于2021年11月22日周一 下午4:59写道:
>> >
>> > > 感觉你的场景是近实时累计统计,你可以参考下window TVF 看下是否满足
>> > >
>> > >
>> >
>> https://flink-learning.org.cn/article/detail/a8b0895d4271bf6b770927eea214612d?tab=SQL&page=1
>> > > 具体在第2章第一节
>> > >
>> > > Pinjie Huang  于2021年11月22日周一
>> > 下午3:52写道:
>> > >
>> > > > Hi friends,
>> > > >
>> > > > Flink 自带的window 有tumlbing sliding 和 session 但是似乎没有event triggerred。
>> > > >
>> > > > 比如说 想知道过去1小时event A trigger的次数,
>> > > >
>> > > > 如果使用tumbling window和1h window
>> > > > |1h | 1h |
>> > > > t=0
>> > > > 在t=1.5h时刻,读取数据,是t=1h 时刻过去一小时的数据,而不是实时的。
>> > > >
>> > > > 使用sliding window 的话需要define非常小的slide,而且依旧可能有延迟。
>> > > >
>> > > > 如果想知道实时的数据,需要基于event来更新state,如何实现event triggered window?
>> > > >
>> > >
>> >
>>
>>
>> --
>> Thanks,
>> Pinjie Huang
>>
>


Re: 如何实现event triggered window?

2021-11-22 文章 yidan zhao
哦哦,懂了,那还有另一个问题。你什么时候需要知道呢?
是只有半小时时刻吗,还是随时随刻都可能,如果是随时随刻都希望能拿到过去1h,这个本身就没意义,比如1s分成1000ms,每个ms你都希望拿到过去1h的数据,只能按照sliding
window做,而且这个性能消耗很高,取决于你究竟多久需要拿到一次。
如果你只是固定的不希望用0-1,1-2,而是需要0.5-1.5,1.5-2.5这样的话使用offset就可以实现。

Pinjie Huang  于2021年11月23日周二 下午1:18写道:

> Hi Yidan,
>
> Tumbling window 只有
> t=0~1h
> t=1~2h
> 等等的window
>
> 我现在需要在 t=X 时刻,得到 t=(X-1) ~ X 的window 比如
> t=1.5h 时刻 我需要 t=0.5~1.5h 这个window
>
> On Tue, Nov 23, 2021 at 12:32 PM yidan zhao  wrote:
>
> > 其实问题还是没描述清楚,所以遇到问题究竟是啥,没发现tumbling window不行。
> >
> > zhiyuan su  于2021年11月22日周一 下午4:59写道:
> >
> > > 感觉你的场景是近实时累计统计,你可以参考下window TVF 看下是否满足
> > >
> > >
> >
> https://flink-learning.org.cn/article/detail/a8b0895d4271bf6b770927eea214612d?tab=SQL&page=1
> > > 具体在第2章第一节
> > >
> > > Pinjie Huang  于2021年11月22日周一
> > 下午3:52写道:
> > >
> > > > Hi friends,
> > > >
> > > > Flink 自带的window 有tumlbing sliding 和 session 但是似乎没有event triggerred。
> > > >
> > > > 比如说 想知道过去1小时event A trigger的次数,
> > > >
> > > > 如果使用tumbling window和1h window
> > > > |1h | 1h |
> > > > t=0
> > > > 在t=1.5h时刻,读取数据,是t=1h 时刻过去一小时的数据,而不是实时的。
> > > >
> > > > 使用sliding window 的话需要define非常小的slide,而且依旧可能有延迟。
> > > >
> > > > 如果想知道实时的数据,需要基于event来更新state,如何实现event triggered window?
> > > >
> > >
> >
>
>
> --
> Thanks,
> Pinjie Huang
>


Re: 如何实现event triggered window?

2021-11-22 文章 tison
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/window-tvf/

你可以关注一下这里有个 offset 参数,这样就能用 tumbling window 了 =。=

Best,
tison.


Pinjie Huang  于2021年11月23日周二 下午1:18写道:

> Hi Yidan,
>
> Tumbling window 只有
> t=0~1h
> t=1~2h
> 等等的window
>
> 我现在需要在 t=X 时刻,得到 t=(X-1) ~ X 的window 比如
> t=1.5h 时刻 我需要 t=0.5~1.5h 这个window
>
> On Tue, Nov 23, 2021 at 12:32 PM yidan zhao  wrote:
>
> > 其实问题还是没描述清楚,所以遇到问题究竟是啥,没发现tumbling window不行。
> >
> > zhiyuan su  于2021年11月22日周一 下午4:59写道:
> >
> > > 感觉你的场景是近实时累计统计,你可以参考下window TVF 看下是否满足
> > >
> > >
> >
> https://flink-learning.org.cn/article/detail/a8b0895d4271bf6b770927eea214612d?tab=SQL&page=1
> > > 具体在第2章第一节
> > >
> > > Pinjie Huang  于2021年11月22日周一
> > 下午3:52写道:
> > >
> > > > Hi friends,
> > > >
> > > > Flink 自带的window 有tumlbing sliding 和 session 但是似乎没有event triggerred。
> > > >
> > > > 比如说 想知道过去1小时event A trigger的次数,
> > > >
> > > > 如果使用tumbling window和1h window
> > > > |1h | 1h |
> > > > t=0
> > > > 在t=1.5h时刻,读取数据,是t=1h 时刻过去一小时的数据,而不是实时的。
> > > >
> > > > 使用sliding window 的话需要define非常小的slide,而且依旧可能有延迟。
> > > >
> > > > 如果想知道实时的数据,需要基于event来更新state,如何实现event triggered window?
> > > >
> > >
> >
>
>
> --
> Thanks,
> Pinjie Huang
>


Re: 如何实现event triggered window?

2021-11-22 文章 Pinjie Huang
Hi Yidan,

Tumbling window 只有
t=0~1h
t=1~2h
等等的window

我现在需要在 t=X 时刻,得到 t=(X-1) ~ X 的window 比如
t=1.5h 时刻 我需要 t=0.5~1.5h 这个window

On Tue, Nov 23, 2021 at 12:32 PM yidan zhao  wrote:

> 其实问题还是没描述清楚,所以遇到问题究竟是啥,没发现tumbling window不行。
>
> zhiyuan su  于2021年11月22日周一 下午4:59写道:
>
> > 感觉你的场景是近实时累计统计,你可以参考下window TVF 看下是否满足
> >
> >
> https://flink-learning.org.cn/article/detail/a8b0895d4271bf6b770927eea214612d?tab=SQL&page=1
> > 具体在第2章第一节
> >
> > Pinjie Huang  于2021年11月22日周一
> 下午3:52写道:
> >
> > > Hi friends,
> > >
> > > Flink 自带的window 有tumlbing sliding 和 session 但是似乎没有event triggerred。
> > >
> > > 比如说 想知道过去1小时event A trigger的次数,
> > >
> > > 如果使用tumbling window和1h window
> > > |1h | 1h |
> > > t=0
> > > 在t=1.5h时刻,读取数据,是t=1h 时刻过去一小时的数据,而不是实时的。
> > >
> > > 使用sliding window 的话需要define非常小的slide,而且依旧可能有延迟。
> > >
> > > 如果想知道实时的数据,需要基于event来更新state,如何实现event triggered window?
> > >
> >
>


-- 
Thanks,
Pinjie Huang


Re: 如何实现event triggered window?

2021-11-22 文章 yidan zhao
其实问题还是没描述清楚,所以遇到问题究竟是啥,没发现tumbling window不行。

zhiyuan su  于2021年11月22日周一 下午4:59写道:

> 感觉你的场景是近实时累计统计,你可以参考下window TVF 看下是否满足
>
> https://flink-learning.org.cn/article/detail/a8b0895d4271bf6b770927eea214612d?tab=SQL&page=1
> 具体在第2章第一节
>
> Pinjie Huang  于2021年11月22日周一 下午3:52写道:
>
> > Hi friends,
> >
> > Flink 自带的window 有tumlbing sliding 和 session 但是似乎没有event triggerred。
> >
> > 比如说 想知道过去1小时event A trigger的次数,
> >
> > 如果使用tumbling window和1h window
> > |1h | 1h |
> > t=0
> > 在t=1.5h时刻,读取数据,是t=1h 时刻过去一小时的数据,而不是实时的。
> >
> > 使用sliding window 的话需要define非常小的slide,而且依旧可能有延迟。
> >
> > 如果想知道实时的数据,需要基于event来更新state,如何实现event triggered window?
> >
>


flink1.12 ??????????????state.checkpoints.num-retained????????????

2021-11-22 文章 ????????
   
??flink1.12??flink-conf.yaml??state.checkpoints.num-retained: 
3checkpoint??1checkpoint??on yarn

Re: jdk11创建hive catalog抛错

2021-11-22 文章 Caizhi Weng
Hi!

这是从 hive 里产生的错误。据我所知,hive 对 Java 11 的支持仍在建设中 [1],因此还是建议使用 Java 8。

[1] https://issues.apache.org/jira/browse/HIVE-22415

aiden <18765295...@163.com> 于2021年11月22日周一 下午12:00写道:

> 求助,jdk从8升级到11后使用hive作为flink
> table的catalog抛错,排查是bsTableEnv.registerCatalog(catalogName, catalog)
> 抛错,具体异常为:
> 11:55:22.343 [main] ERROR hive.log - Got exception:
> java.lang.ClassCastException class [Ljava.lang.Object; cannot be cast to
> class [Ljava.net.URI; ([Ljava.lang.Object; and [Ljava.net.URI; are in
> module java.base of loader 'bootstrap')
> java.lang.ClassCastException: class [Ljava.lang.Object; cannot be cast to
> class [Ljava.net.URI; ([Ljava.lang.Object; and [Ljava.net.URI; are in
> module java.base of loader 'bootstrap')
> at
> org.apache.hadoop.hive.metastore.HiveMetaStoreClient.(HiveMetaStoreClient.java:274)
> [hive-exec-2.1.1.jar:2.1.1]
> at
> org.apache.hadoop.hive.metastore.HiveMetaStoreClient.(HiveMetaStoreClient.java:210)
> [hive-exec-2.1.1.jar:2.1.1]
> at
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> Method) ~[?:?]
> at
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> [?:?]
> at
> java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> [?:?]
> at
> java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
> [?:?]
> at
> org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1652)
> [hive-exec-2.1.1.jar:2.1.1]
> at
> org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.(RetryingMetaStoreClient.java:80)
> [hive-exec-2.1.1.jar:2.1.1]
> at
> org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:130)
> [hive-exec-2.1.1.jar:2.1.1]
> at
> org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:115)
> [hive-exec-2.1.1.jar:2.1.1]
> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method) ~[?:?]
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> ~[?:?]
> at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> ~[?:?]
> at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
> at
> org.apache.flink.table.catalog.hive.client.HiveShimV200.getHiveMetastoreClient(HiveShimV200.java:54)
> [flink-connector-hive_2.11-1.14.0.jar:1.14.0]
> at
> org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper.createMetastoreClient(HiveMetastoreClientWrapper.java:277)
> [flink-connector-hive_2.11-1.14.0.jar:1.14.0]
> at
> org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper.(HiveMetastoreClientWrapper.java:78)
> [flink-connector-hive_2.11-1.14.0.jar:1.14.0]
> at
> org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper.(HiveMetastoreClientWrapper.java:68)
> [flink-connector-hive_2.11-1.14.0.jar:1.14.0]
> at
> org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory.create(HiveMetastoreClientFactory.java:32)
> [flink-connector-hive_2.11-1.14.0.jar:1.14.0]
> at
> org.apache.flink.table.catalog.hive.HiveCatalog.open(HiveCatalog.java:296)
> [flink-connector-hive_2.11-1.14.0.jar:1.14.0]
> at
> org.apache.flink.table.catalog.CatalogManager.registerCatalog(CatalogManager.java:195)
> [flink-table-api-java-1.14.0.jar:1.14.0]
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.registerCatalog(TableEnvironmentImpl.java:373)
> [flink-table-api-java-1.14.0.jar:1.14.0]
> at catalogTest.FlinkExecTableRun.flinkMain(FlinkExecTableRun.java:27)
> [classes/:?]
> at catalogTest.test.main(test.java:11) [classes/:?]
> 11:55:22.348 [main] ERROR hive.log - Converting exception to MetaException
> Exception in thread "main"
> org.apache.flink.table.catalog.exceptions.CatalogException: Failed to
> create Hive Metastore client
> at
> org.apache.flink.table.catalog.hive.client.HiveShimV200.getHiveMetastoreClient(HiveShimV200.java:61)
> at
> org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper.createMetastoreClient(HiveMetastoreClientWrapper.java:277)
> at
> org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper.(HiveMetastoreClientWrapper.java:78)
> at
> org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper.(HiveMetastoreClientWrapper.java:68)
> at
> org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory.create(HiveMetastoreClientFactory.java:32)
> at
> org.apache.flink.table.catalog.hive.HiveCatalog.open(HiveCatalog.java:296)
> at
> org.apache.flink.table.catalog.CatalogManager.registerCatalog(CatalogManager.java:195)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.registerCatalog(TableEnvironmentImpl.java:373)
> at catalogTest.FlinkExecTableRun.flinkMain(FlinkExecTableRun.java:27)
> at catalogTest.test.main(test.java:11)
> Caused by: java.lang.reflect.InvocationTargetException

关于flink plugins目录不生效的疑问

2021-11-22 文章 RS
Hi,

环境:flink-1.14.0,单节点standalone



https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/filesystems/s3/

参考官方文档,执行下面命令:

mkdir plugins/s3-fs-hadoop

cp opt/flink-s3-fs-hadoop-1.14.0.jar plugins/s3-fs-hadoop/



在flink-conf中配置了hadoop的路径(s3使用了hadoop的配置文件)

env.hadoop.conf.dir: /data/hadoop/s3

然后启动集群,成功启动后,执行./bin/sql-client.sh,用SQL测试读取s3数据, 出现连接超时报错



但是把flink-s3-fs-hadoop-1.14.0.jar挪到flink的lib下,重启集群,重新执行同样的测试,就可以读取到数据了,其余的配置都没有修改
所以感觉这个plugins目录没有生效?这个plugins和lib目录的区别在哪里,应该如何使用??


附上报错信息:
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.util.SerializedThrowable: connect timed out


Caused by: com.amazonaws.SdkClientException: Failed to connect to service 
endpoint: 
at 
com.amazonaws.internal.EC2ResourceFetcher.doReadResource(EC2ResourceFetcher.java:100)
at 
com.amazonaws.internal.EC2ResourceFetcher.doReadResource(EC2ResourceFetcher.java:70)
at 
com.amazonaws.internal.InstanceMetadataServiceResourceFetcher.readResource(InstanceMetadataServiceResourceFetcher.java:75)
at 
com.amazonaws.internal.EC2ResourceFetcher.readResource(EC2ResourceFetcher.java:66)
at 
com.amazonaws.auth.InstanceMetadataServiceCredentialsFetcher.getCredentialsEndpoint(InstanceMetadataServiceCredentialsFetcher.java:58)
at 
com.amazonaws.auth.InstanceMetadataServiceCredentialsFetcher.getCredentialsResponse(InstanceMetadataServiceCredentialsFetcher.java:46)
at 
com.amazonaws.auth.BaseCredentialsFetcher.fetchCredentials(BaseCredentialsFetcher.java:112)
at 
com.amazonaws.auth.BaseCredentialsFetcher.getCredentials(BaseCredentialsFetcher.java:68)
at 
com.amazonaws.auth.InstanceProfileCredentialsProvider.getCredentials(InstanceProfileCredentialsProvider.java:165)
at 
org.apache.hadoop.fs.s3a.AWSCredentialProviderList.getCredentials(AWSCredentialProviderList.java:137)
... 49 more
Caused by: java.net.SocketTimeoutException: connect timed out
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at 
java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:607)
at sun.net.NetworkClient.doConnect(NetworkClient.java:175)
at sun.net.www.http.HttpClient.openServer(HttpClient.java:463)
at sun.net.www.http.HttpClient.openServer(HttpClient.java:558)
at sun.net.www.http.HttpClient.(HttpClient.java:242)
at sun.net.www.http.HttpClient.New(HttpClient.java:339)
at sun.net.www.http.HttpClient.New(HttpClient.java:357)
at 
sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1226)
at 
sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1205)
at 
sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1056)
at 
sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:990)
at 
com.amazonaws.internal.ConnectionUtils.connectToEndpoint(ConnectionUtils.java:52)
at 
com.amazonaws.internal.EC2ResourceFetcher.doReadResource(EC2ResourceFetcher.java:80)
... 58 more



flink1.12 请教下如何配置多hadoop参数,s3使用问题

2021-11-22 文章 RS
hi,


环境:
1. flink-1.12,版本可以升级
2. flink-conf中配置了env.hadoop.conf.dir,路径下有hdfs集群的core-site.xml和hdfs-site.xml, 
state.backend保存在该HDFS上
3. flink的部署模式是K8S+session


需求:
需要从一个s3协议的分布式文件系统中读取文件,处理完写到mysql中


问题:
s3配置采用hadoop的配置方式,保存为一个新的core-site.xml文件,参考的 
https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#S3A


按照官网说明文档中,需要 修改hadoop的环境变量,但是就和以前的core-site.xml冲突了,无法同时配置2个hadoop路径
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/filesystems/s3/


或者 在flink-conf.yaml中添加一堆s3配置,这样又写死了,再新增一个s3集群的时候如何处理?


所以请教下如何解决这类问题(可以修改代码)?如何配置多个hadoop配置(比如从第一个文件系统(s3协议)读数据,写到第二个文件系统中(s3协议))?





?????? ??????????????downloads/setup-pyflink-virtual-env.sh????

2021-11-22 文章 Asahi Lee
??venv.zip??1.14.0??




--  --
??: 
   "user-zh"

https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/python_config/#python-client-executable
>
> 
??1.14.0??-pyclientexec 
venv.zip/venv/bin/python
>
>
> On Fri, Nov 19, 2021 at 10:48 AM Asahi Lee 
<978466...@qq.com.invalid>
> wrote:
>
> > ??source
> 
my_env/bin/activate??PYFLINK_CLIENT_EXECUTABLE??
> > jobmanagerNo module named 
pyflinkjobmanageryarn
> > ??
> >
> >
> > &gt; LogType:jobmanager.out
> > &gt; Log Upload Time:?? ?? 18 20:48:45 +0800 2021
> > &gt; LogLength:37
> > &gt; Log Contents:
> > &gt; /bin/python: No module named pyflink
> >
> >
> >
> >
> > 
--&nbsp;&nbsp;--
> > ??:
> 
>                                                  
> "user-zh"
> 
>                                                                    
> <
> > dian0511...@gmail.com&gt;;
> > :&nbsp;2021??11??19??(??) 9:38
> > 
??:&nbsp;"user-zh"https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/python/dependency_management/#python-interpreter-of-client
> >
> 
;
> ;
> > On Thu, Nov 18, 2021 at 9:00 PM Asahi Lee 
<978466...@qq.com.invalid
> &gt;
> > wrote:
> >
> > &gt; Hi !
> > &gt; &amp;nbsp; &amp;nbsp;java Table 
api??python udf
> > &gt; 
pythonjm??/bin/python:
> No module
> > named
> > &gt; pyflink??
> > &gt;
> > &gt;
> > &gt; ./flink-1.13.2/bin/flink&amp;nbsp;
> > &gt; run-application -t yarn-application&amp;nbsp;
> > &gt;
> >
> 
-Dyarn.provided.lib.dirs="hdfs://nameservice1/user/flink/flinklib"&amp;nbsp;
> > &gt; -Dyarn.application.queue=d
> > &gt; -p 1&amp;nbsp;
> > &gt; -pyarch /opt/venv.zip
> > &gt; -pyexec venv.zip/venv/bin/python&amp;nbsp;
> > &gt; -pyfs /opt/test.py&amp;nbsp;
> > &gt; -c test.PyUDFTest&amp;nbsp;
> > &gt; /opt/flink-python-test-1.0-SNAPSHOT.jar
> > &gt;
> > &gt;
> > &gt;
> > &gt; ??
> > &gt; Caused by: java.lang.RuntimeException: Python 
callback
> server start
> > failed!
> > &gt; &amp;nbsp; &amp;nbsp; 
&amp;nbsp; &amp;nbsp;
> at
> > &gt;
> >
> 
org.apache.flink.client.python.PythonFunctionFactory.createPythonFunctionFactory(PythonFunctionFactory.java:167)
> > &gt; ~[flink-python_2.11-1.13.2.jar:1.13.2]
> > &gt; &amp;nbsp; &amp;nbsp; 
&amp;nbsp; &amp;nbsp;
> at
> > &gt;
> >
> 
org.apache.flink.client.python.PythonFunctionFactory$1.load(PythonFunctionFactory.java:88)
> > &gt; ~[flink-python_2.11-1.13.2.jar:1.13.2]
> > &gt; &amp;nbsp; &amp;nbsp; 
&amp;nbsp; &amp;nbsp;
> at
> > &gt;
> >
> 
org.apache.flink.client.python.PythonFunctionFactory$1.load(PythonFunctionFactory.java:84)
> > &gt; ~[flink-python_2.11-1.13.2.jar:1.13.2]
> > &gt; &amp;nbsp; &amp;nbsp; 
&amp;nbsp; &amp;nbsp;
> at
> > &gt;
> >
> 
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527)
> > &gt; ~[flink-dist_2.11-1.13.1.jar:1.13.1]
> > &gt; &amp;nbsp; &amp;nbsp; 
&amp;nbsp; &amp;nbsp;
> at
> > &gt;
> >
> 
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319)
> > &gt; ~[flink-dist_2.11-1.13.1.jar:1.13.1]
> > &gt; &amp;nbsp; &amp;nbsp; 
&amp;nbsp; &amp;nbsp;
> at
> > &gt;
> >
> 
org.apache.flink

询问下Flink分布式缓存的问题

2021-11-22 文章 anig kus
Flink分布式缓存怎么没有效果呢,这里不好贴代码,可以看这个链接下
https://issues.apache.org/jira/browse/FLINK-24973
麻烦解答下,是我用的不对还是理解不对.


Re: 回复:flink1.13.1 sql client connect hivecatalog 报错

2021-11-22 文章 zhiyuan su
感谢,我已经解决了。
更换了jdk 版本,重新替换了插件包

RS  于2021年11月22日周一 下午1:44写道:

> 图片看不到的,尽量不要发图片,你可以复制文字出来并说明下,
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2021-11-22 13:14:13,"zhiyuan su"  写道:
>
> 我使用的是上面的jar 包。从1.13的文档处获取的,但维标注flink 版本,我理解应该是flink1.13版本编译的。
>
>
>
> 这个是yaml文件,我直接在sql 客户端,通过DDL 的方式去编写的话,也是如下报错:
> Caused by: java.util.ServiceConfigurationError:
> org.apache.flink.table.factories.Factory:
> org.apache.flink.table.module.hive.HiveModuleFactory not a subtype
>


Re: 如何实现event triggered window?

2021-11-22 文章 zhiyuan su
感觉你的场景是近实时累计统计,你可以参考下window TVF 看下是否满足
https://flink-learning.org.cn/article/detail/a8b0895d4271bf6b770927eea214612d?tab=SQL&page=1
具体在第2章第一节

Pinjie Huang  于2021年11月22日周一 下午3:52写道:

> Hi friends,
>
> Flink 自带的window 有tumlbing sliding 和 session 但是似乎没有event triggerred。
>
> 比如说 想知道过去1小时event A trigger的次数,
>
> 如果使用tumbling window和1h window
> |1h | 1h |
> t=0
> 在t=1.5h时刻,读取数据,是t=1h 时刻过去一小时的数据,而不是实时的。
>
> 使用sliding window 的话需要define非常小的slide,而且依旧可能有延迟。
>
> 如果想知道实时的数据,需要基于event来更新state,如何实现event triggered window?
>