Re: 广告业务中 如何用flink替换spark一些逻辑处理,是否需要用到processfunction

2021-08-17 Thread yidan zhao
逻辑混乱,没听懂你的需求。大搜?

张锴  于2021年8月18日周三 上午10:26写道:
>
> 需求描述:
> 需要将目前的spark程序替换成flink去做,在梳理逻辑的时候有一块不知道用flink咋实现,spark是按每三分钟一个批次来跑的。
> 描述如下:
> 广告日志按照ask日志->bid->show->click顺序流程,要求是要将不同的日志都与bid日志merge,来保证bid数据的完整性,key按sessionid+Adid做唯一
> 逻辑:spark读取多个日志topic
> 含xxtopic,格式化,joinAll之后得到(string,pair)日志类型pair.logType如果是'bid'直接写到bidtopic,如果是其他类型,需要从之前HBASE缓存中拿bid表匹配,匹配到(可能是show
> or click ..)合并输出到bidtopic,
> 没有匹配到,会有pair.n来记录次数,并写到xxtopic,n>10次(循环来回30分钟)都没有匹配到bid数据直接写到bidtopic,n<=10次内匹配不到bid
> n+1,并写到xxtopic进入下个批次。
> 10次是业务方提的,也就是30分钟的缓存,如果没有10次限定,会有很多数据都写到xxtopic,这里不涉及计算,只是合并,也不去重,假如根据key
> 找到了3条同样的数据,也要合并三条。
>
> 这个用flink怎么实现?


flink任务触发检查点时报错,非必现。并发访问Map异常。

2021-08-17 Thread yidan zhao
下面是异常栈,我检查了出问题的那个task,该task包含2个算子A和B。
B是异步算子,但是目前无任何状态。A是广播处理算子(接受普通流和广播流),也仅用到broadcast state。

请问有人能分析下啥问题导致的Map并发访问问题吗。


2021-08-18 06:18:37
java.io.IOException: Could not perform checkpoint 575 for operator
ual_transform_UserLogBlackUidJudger -> ual_transform_IpLabel
(18/60)#0.
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1045)
at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:135)
at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:250)
at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:61)
at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:431)
at 
org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:61)
at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:227)
at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:180)
at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:158)
at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:98)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.checkpoint.CheckpointException:
Could not complete snapshot 575 for operator
ual_transform_UserLogBlackUidJudger -> ual_transform_IpLabel
(18/60)#0. Failure reason: Checkpoint was declined.
at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:264)
at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:169)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371)
at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:706)
at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:627)
at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:590)
at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:312)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:1089)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1073)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1029)
... 20 more
Caused by: java.util.ConcurrentModificationException
at java.util.HashMap$HashIterator.nextNode(HashMap.java:1445)
at java.util.HashMap$EntryIterator.next(HashMap.java:1479)
at java.util.HashMap$EntryIterator.next(HashMap.java:1477)
at 
com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:156)
at 
com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:21)
at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:260)
at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:234)
 

广告业务中 如何用flink替换spark一些逻辑处理,是否需要用到processfunction

2021-08-17 Thread 张锴
需求描述:
需要将目前的spark程序替换成flink去做,在梳理逻辑的时候有一块不知道用flink咋实现,spark是按每三分钟一个批次来跑的。
描述如下:
广告日志按照ask日志->bid->show->click顺序流程,要求是要将不同的日志都与bid日志merge,来保证bid数据的完整性,key按sessionid+Adid做唯一
逻辑:spark读取多个日志topic
含xxtopic,格式化,joinAll之后得到(string,pair)日志类型pair.logType如果是'bid'直接写到bidtopic,如果是其他类型,需要从之前HBASE缓存中拿bid表匹配,匹配到(可能是show
or click ..)合并输出到bidtopic,
没有匹配到,会有pair.n来记录次数,并写到xxtopic,n>10次(循环来回30分钟)都没有匹配到bid数据直接写到bidtopic,n<=10次内匹配不到bid
n+1,并写到xxtopic进入下个批次。
10次是业务方提的,也就是30分钟的缓存,如果没有10次限定,会有很多数据都写到xxtopic,这里不涉及计算,只是合并,也不去重,假如根据key
找到了3条同样的数据,也要合并三条。

这个用flink怎么实现?


Re: PyFlink StreamingFileSink bulk-encoded format (Avro)

2021-08-17 Thread Dian Fu
Hi Kamil,

AFAIK, it should still not support Avro format in Python StreamingFileSink in 
the Python DataStream API. However, I guess you could convert DataStream to 
Table[1] and then you could use all the connectors supported in the Table & 
SQL. In this case, you could use the FileSystem connector[2] and Avro format[3] 
for your requirements.

Regards,
Dian

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/python/datastream/intro_to_datastream_api/#emit-results-to-a-table--sql-sink-connector
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/filesystem/
 

[3] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/formats/avro/

> 2021年8月17日 下午4:13,Kamil ty  写道:
> 
> Hello,
> 
> I'm trying to save my data stream to an Avro file on HDFS. In Flink 
> documentation I can only see explanations for Java/Scala. However, I can't 
> seem to find a way to do it in PyFlink. Is this possible to do in PyFlink 
> currently?
> 
> Kind Regards
> Kamil



Re: Flink taskmanager in crash loop

2021-08-17 Thread Yangze Guo
> 2021-08-16 15:58:13.986 [Cancellation Watchdog for Source: MASKED] ERROR 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner  - Fatal > error 
> occurred while executing the TaskManager. Shutting it down...
> org.apache.flink.util.FlinkRuntimeException: Task did not exit gracefully 
> within 180 + seconds.

It seems the Task 'MASKED' can not be terminated within the timeout. I
think this would be the root cause of TaskManager's termination. We
need to find why Task 'MASKED' has been canceled. Can you provide some
logs related to it? Maybe you can search the "CANCELING" in jm and tm
logs.

Best,
Yangze Guo

On Wed, Aug 18, 2021 at 1:20 AM Abhishek Rai  wrote:
>
> Before these message, there is the following message in the log:
>
> 2021-08-12 23:02:58.015 [Canceler/Interrupts for Source: MASKED]) 
> (1/1)#29103' did not react to cancelling signal for 30 seconds, but is stuck 
> in method:
>  java.base@11.0.11/jdk.internal.misc.Unsafe.park(Native Method)
> java.base@11.0.11/java.util.concurrent.locks.LockSupport.parkNanos(Unknown 
> Source)
> java.base@11.0.11/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(Unknown
>  Source)
> app//org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:149)
> app//org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:341)
> app//org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:330)
> app//org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:202)
> app//org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:661)
> app//org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
> app//org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:776)
> app//org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
> java.base@11.0.11/java.lang.Thread.run(Unknown Source)
>
> On Tue, Aug 17, 2021 at 9:22 AM Abhishek Rai  wrote:
>>
>> Thanks Yangze, indeed, I see the following in the log about 10s before the 
>> final crash (masked some sensitive data using `MASKED`):
>>
>> 2021-08-16 15:58:13.985 [Canceler/Interrupts for Source: MAKSED] WARN 
>> org.apache.flink.runtime.taskmanager.Task  - Task 'MASKED' did not react to 
>> cancelling signal for 30 seconds, but is stuck in method:
>>  java.base@11.0.11/jdk.internal.misc.Unsafe.park(Native Method)
>> java.base@11.0.11/java.util.concurrent.locks.LockSupport.park(Unknown Source)
>> java.base@11.0.11/java.util.concurrent.CompletableFuture$Signaller.block(Unknown
>>  Source)
>> java.base@11.0.11/java.util.concurrent.ForkJoinPool.managedBlock(Unknown 
>> Source)
>> java.base@11.0.11/java.util.concurrent.CompletableFuture.waitingGet(Unknown 
>> Source)
>> java.base@11.0.11/java.util.concurrent.CompletableFuture.join(Unknown Source)
>> app//org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:705)
>> app//org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cleanUpInvoke(SourceStreamTask.java:186)
>> app//org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:637)
>> app//org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:776)
>> app//org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>> java.base@11.0.11/java.lang.Thread.run(Unknown Source)
>>
>> 2021-08-16 15:58:13.986 [Cancellation Watchdog for Source: MASKED] ERROR 
>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner  - Fatal error 
>> occurred while executing the TaskManager. Shutting it down...
>> org.apache.flink.util.FlinkRuntimeException: Task did not exit gracefully 
>> within 180 + seconds.
>>   at 
>> org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1718)
>>   at java.base/java.lang.Thread.run(Unknown Source)
>>
>>
>>
>> On Mon, Aug 16, 2021 at 7:05 PM Yangze Guo  wrote:
>>>
>>> Hi, Abhishek,
>>>
>>> Do you see something like "Fatal error occurred while executing the
>>> TaskManager" in your log or would you like to provide the whole task
>>> manager log?
>>>
>>> Best,
>>> Yangze Guo
>>>
>>> On Tue, Aug 17, 2021 at 5:17 AM Abhishek Rai  wrote:
>>> >
>>> > Hello,
>>> >
>>> > In our production environment, running Flink 1.13 (Scala 2.11), where 
>>> > Flink has been working without issues with a dozen or so jobs running for 
>>> > a while, Flink taskmanager started crash looping with a period of ~4 
>>> > minutes per crash.  The stack trace is not very informative, therefore 
>>> > reaching out for help, see below.
>>> >
>>> > The only other thing that's unusual is that due to what might be a 
>>> > product issue (custom job code running on Flink), some or all of our 
>>> > tasks are also in a crash loop.  Still, I wasn't expecting taskmanager 
>>> > itself to die.  Does taskmanager have some built in feature to crash if 
>>> > all/most tasks are crashing?
>>> >
>>> > 2021-08-16 

Re: Handling HTTP Requests for Keyed Streams

2021-08-17 Thread JING ZHANG
Hi Rion,
Your solution is good.

It seems that you need enrich a stream with data queries from external Http
request. There is another solution for reference, just like the mechanism
of lookup join in Flink SQL.
Lookup Join in Flink SQL supports two modes: Async mode and Sync mode.
For each input data from the original source, it lookup keys from dimension
table.
To avoid frequency external I/O, some dimension sources use Cache in memory.
E.g HBase dimension table source would use LRU Cache in memory, it caches
the value for recently used, if the input data hits the query, it could
avoid external I/O; else an external
call would be triggered, and the result value would be cached into LRU
Cache.
E.g Hive dimension table source would load all data into Cache in Memory,
the cache would refresh regularly according to the specified interval.

Hope the information is helpful.

Best,
JING ZHANG


Rion Williams  于2021年8月17日周二 下午9:23写道:

> Hi Caizhi,
>
> I don’t mind the request being synchronous (or not using the Async I/O
> connectors). Assuming I go down that route would this be the appropriate
> way to handle this? Specifically creating an HttpClient and storing the
> result in state and on a keyed stream if the state was empty?
>
> It makes sense to me, just wondering if there are any gotchas or
> recommendations in terms of a client that might support things like retries
> and if this a good pattern to accomplish this.
>
> Thanks,
>
> Rion
>
> On Aug 16, 2021, at 11:57 PM, Caizhi Weng  wrote:
>
> 
> Hi!
>
> As you mentioned that the configuration fetching is very infrequent, why
> don't you use a blocking approach to send HTTP requests and receive
> responses? This seems like a more reasonable solution to me.
>
> Rion Williams  于2021年8月17日周二 上午4:00写道:
>
>> Hi all,
>>
>> I've been exploring a few different options for storing tenant-specific
>> configurations within Flink state based on the messages I have flowing
>> through my job. Initially I had considered creating a source that would
>> periodically poll an HTTP API and connect that stream to my original event
>> stream.
>>
>> However, I realized that this configuration information would basically
>> never change and thus it doesn't quite make sense to poll so frequently. My
>> next approach would be to have a function that would be keyed (by tenant)
>> and storing the configuration for that tenant in state (and issue an HTTP
>> call when I did not have it). Something like this:
>>
>> class ConfigurationLookupFunction: KeyedProcessFunction> JsonObject>() {
>> // Tenant specific configuration
>> private lateinit var httpClient: HttpClient
>> private lateinit var configuration: ValueState
>>
>> override fun open(parameters: Configuration) {
>> super.open(parameters)
>> httpClient = HttpClient.newHttpClient()
>> }
>>
>> override fun processElement(message: JsonObject, context: Context, out: 
>> Collector) {
>> if (configuration.value() == null){
>> // Issue a request to the appropriate API to load the 
>> configuration
>> val url = 
>> HttpRequest.newBuilder(URI.create(".../${context.currentKey}")).build()
>> httpClient.send(..., {
>> // Store the configuration info within state here
>> configuration.update(...)
>> })
>>
>> out.collect(message)
>> }
>> else {
>> // Get the configuration information and pass it downstream to 
>> be used by the sink
>> out.collect(message)
>> }
>> }
>> }
>>
>> I didn't see any support for using the Async I/O functions from a keyed
>> context, otherwise I'd imagine that would be ideal. The requests themselves
>> should be very infrequent (initial call per tenant) and I'd imagine after
>> that the necessary configuration could be pulled/stored within the state
>> for that key.
>>
>> Is there a good way of handling this that I might be overlooking with an
>> existing Flink construct or function? I'd love to be able to leverage the
>> Async I/O connectors as they seem pretty well thought out.
>>
>> Thanks in advance!
>>
>> Rion
>>
>>
>>


flink Kinesis Consumer Connected but not consuming

2021-08-17 Thread tarun joshi
Hey All,

I am running flink in docker containers (image Tag
:flink:scala_2.11-java11) on EC2.

I am able to connect to a Kinesis Connector but nothing is being consumed.

My command to start Jobmanager and TaskManager :









*docker run \--rm \--volume /root/:/root/ \--env
JOB_MANAGER_RPC_ADDRESS="${JOB_MANAGER_RPC_ADDRESS}" \--env
TASK_MANAGER_NUMBER_OF_TASK_SLOTS="${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}"
\--env
ENABLE_BUILT_IN_PLUGINS="flink-s3-fs-hadoop-1.13.1.jar;flink-s3-fs-presto-1.13.1.jar"
\--name=jobmanager \--network flink-network \--publish 8081:8081
\flink:scala_2.11-java11 jobmanager &*









*docker run \--rm \--env
JOB_MANAGER_RPC_ADDRESS="${JOB_MANAGER_RPC_ADDRESS}" \--env
TASK_MANAGER_NUMBER_OF_TASK_SLOTS="${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}"
\--env
ENABLE_BUILT_IN_PLUGINS="flink-s3-fs-hadoop-1.13.1.jar;flink-s3-fs-presto-1.13.1.jar"
\--name=taskmanager_0 \--network flink-network \flink:scala_2.11-java11
taskmanager &*

2021-08-17 22:38:01,106 INFO org.apache.flink.streaming.connectors.kinesis.
FlinkKinesisConsumer [] -
Subtask 0 will be seeded with initial shard StreamShardHandle{streamName='', shard='{ShardId: shardId-,HashKeyRange:
{StartingHashKey: 0,EndingHashKey: 34028236692093846346337460743176821144}
,SequenceNumberRange: {StartingSequenceNumber:
49600280467722672235426674687631661510244124728928239618,}}'}, starting
state set as sequence number LATEST_SEQUENCE_NUM

&&& this for each shard Consumer

2021-08-17 22:38:01,107 INFO
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher
[] - Subtask 0 will start consuming seeded shard
StreamShardHandle{streamName='web-clickstream', shard='{ShardId:
shardId-,HashKeyRange: {StartingHashKey: 0,EndingHashKey:
34028236692093846346337460743176821144},SequenceNumberRange:
{StartingSequenceNumber:
49600280467722672235426674687631661510244124728928239618,}}'} from sequence
number LATEST_SEQUENCE_NUM with ShardConsumer 0

my program is simple to test out a DataStream from Kinesis

FlinkKinesisConsumer kinesisConsumer =
new FlinkKinesisConsumer<>(
"", new SimpleStringSchema(),
getKafkaConsumerProperties());
env.addSource(kinesisConsumer).print();

env.execute("Read files in streaming fashion");

Other Facts:


   1. I can see data being flowing into our kinesis stream from the
   Monitoring Tab of AWS continuously.
   2. I was facing issues with Authorization of accessing the Kinesis in
   our AWS infra, but I resolved that by moving in the same security group of
   Kinesis deployment and creating a role with full access to Kinesis.


Any pointers are really appreciated!

Thanks,
Tarun


Re: flink not able to get scheme for S3

2021-08-17 Thread tarun joshi
Thanks Chesnay ! that helped me resolve the issue


On Fri, 6 Aug 2021 at 04:31, Chesnay Schepler  wrote:

> The reason this doesn't work is that your application works directly
> against Hadoop.
> The filesystems in the plugins directory are only loaded via specific
> code-paths, specifically when the Flink FileSystem class is used.
> Since you are using Hadoop directly you are side-stepping the plugin
> mechanism.
>
> So you have to make sure that Hadoop + Hadoop's S3 filesystem is available
> to the client.
>
> On 06/08/2021 08:02, tarun joshi wrote:
>
> Hey All,
>
> I am running flink in docker containers (image Tag
> :flink:scala_2.11-java11) on EC2 and getting exception as I am trying to
> submit a job through the local ./opt/flink/bin
>
> *org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error: No FileSystem for scheme "s3"*
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
> at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
> at
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
> at
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
> Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: No
> FileSystem for scheme "s3"
> at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3443)
> at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
> at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
> at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
> at
> org.apache.parquet.hadoop.util.HadoopInputFile.fromPath(HadoopInputFile.java:38)
> at
> org.apache.flink.examples.java.wordcount.WordCount.printParquetData(WordCount.java:142)
> at
> org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:83)
> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown
> Source)
> at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
> Source)
> at java.base/java.lang.reflect.Method.invoke(Unknown Source)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
> ... 8 more
>
> This is the way I am invoking Flink Built_IN S3 plugins for the
> Jobmanager and TaskManager :
>
>
>
>
>
>
>
>
>
> *docker run \ --rm \ --volume /root/:/root/ \ --env
> JOB_MANAGER_RPC_ADDRESS="${JOB_MANAGER_RPC_ADDRESS}" \ --env
> TASK_MANAGER_NUMBER_OF_TASK_SLOTS="${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}" \
> --env
> ENABLE_BUILT_IN_PLUGINS="flink-s3-fs-hadoop-1.13.1.jar;flink-s3-fs-presto-1.13.1.jar"
> \ --name=jobmanager \ --network flink-network \ --publish 8081:8081 \
> flink:scala_2.11-java11 jobmanager &*
>
>
>
>
>
>
>
>
>
> *docker run \ --rm \ --env
> JOB_MANAGER_RPC_ADDRESS="${JOB_MANAGER_RPC_ADDRESS}" \ --env
> TASK_MANAGER_NUMBER_OF_TASK_SLOTS="${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}" \
> --env
> ENABLE_BUILT_IN_PLUGINS="flink-s3-fs-hadoop-1.13.1.jar;flink-s3-fs-presto-1.13.1.jar"
> \ --name=taskmanager_0 \ --network flink-network \ flink:scala_2.11-java11
> taskmanager & *
>
> This is how I am defining dependencies in my pom.xml (I am working upon
> the Flink-Examples project from Flink Github repo).
>
> 
>
>   org.apache.flink
>   flink-java
>   ${project.version}
>   provided
>
>
>
>   org.apache.flink
>   flink-scala_${scala.binary.version}
>   ${project.version}
>   provided
>
>
>
>   org.apache.flink
>   flink-clients_${scala.binary.version}
>   ${project.version}
>   provided
>
>
>
>   org.apache.parquet
>   parquet-avro
>   1.12.0
>
>
>   org.apache.parquet
>   parquet-column
>   1.12.0
>
>
>   org.apache.parquet
>   parquet-hadoop
>   1.12.0
>
>
>   org.apache.hadoop
>   hadoop-common
>   3.3.1
>
> 
>
> I am also able to see plugins being loaded for JobManager and TaskManager
> :
>
>
>
>
>
> *Linking flink-s3-fs-hadoop-1.13.1.jar to plugin directory Successfully
> enabled flink-s3-fs-hadoop-1.13.1.jar Linking flink-s3-fs-presto-1.13.1.jar
> to plugin directory Successfully enabled flink-s3-fs-presto-1.13.1.jar *
>
> Let me if I 

Re: redis sink from flink

2021-08-17 Thread Jin Yi
great, thanks for the pointers everyone.

i'm going to pursue rolling my own built around lettuce since it seems more
feature-full wrt async semantics.

On Mon, Aug 16, 2021 at 7:21 PM Yik San Chan 
wrote:

> By the way, this post in Chinese showed how we do it exactly with code.
>
> https://yiksanchan.com/posts/flink-bulk-insert-redis
>
> And yes it had buffered writes support by leveraging Flink operator state,
> and Redis Pipelining. Feel free to let you know if you have any questions.
>
>
> On Tue, Aug 17, 2021 at 10:15 AM Yik San Chan 
> wrote:
>
>> Hi Jin,
>>
>> I was in the same shoes. I tried bahir redis connector at first, then I
>> felt it was very limited, so I rolled out my own. It was actually quite
>> straightforward.
>>
>> All you need to do is to extend RichSinkFunction, then put your logic
>> inside. Regarding Redis clients, Jedis (https://github.com/redis/jedis)
>> is quite popular and simple to get started.
>>
>> Let me know if you love to learn more details about our implementation.
>>
>> Best,
>> Yik San
>>
>> On Tue, Aug 17, 2021 at 9:15 AM Jin Yi  wrote:
>>
>>> is apache bahir still a thing?  it hasn't been touched for months (since
>>> redis 2.8.5).
>>>
>>> as such, looking at the current flink connector docs, it's no longer
>>> pointing to anything from the bahir project.  looking around in either the
>>> flink or bahir newsgroups doesn't turn up anything regarding bahir's EOL.
>>>
>>> is the best bet for a flink to redis sink something i roll on my own
>>> (inclined to go this route w/ buffered writes)?  or should i try going
>>> through via kafka and using confluent's kafka redis connector (flink =>
>>> kafka => redis)?
>>>
>>


Re: Can we release new flink-connector-redis? Thanks!

2021-08-17 Thread Hongbo Miao
Really appreciate, Austin!

Hongbo
On Aug 17, 2021, 10:33 -0700, Austin Cawley-Edwards , 
wrote:
> Hi Hongbo,
>
> Thanks for your interest in the Redis connector! I'm not entirely sure what 
> the release process is like for Bahir, but I've pulled in @Robert Metzger who 
> has been involved in the project in the past and can give an update there.
>
> Best,
> Austin
>
> > On Tue, Aug 17, 2021 at 10:41 AM Hongbo Miao  
> > wrote:
> > > Hi Flink friends,
> > >
> > > I recently have a question about how to set TTL to make Redis keys expire 
> > > in flink-connector-redis.
> > > I originally posted at Stack Overflow at 
> > > https://stackoverflow.com/questions/68795044/how-to-set-ttl-to-make-redis-keys-expire-in-flink-connector-redis
> > >
> > > Then I found there is a pull request added this feature about 2 years ago 
> > > at https://github.com/apache/bahir-flink/pull/66
> > > However, it didn’t got released, which confirmed by David in Stack 
> > > Overflow.
> > >
> > > I opened a requesting release ticket at 
> > > https://issues.apache.org/jira/browse/BAHIR-279
> > > Please let me know if I there is a better way to request. Thanks!
> > >
> > > Best
> > > Hongbo
> > > www.hongbomiao.com


Task Managers having trouble registering after restart

2021-08-17 Thread Kevin Lam
Hi all,

I'm observing an issue sometimes, and it's been hard to reproduce, where
task managers are not able to register with the Flink cluster. We provision
only the number of task managers required to run a given application, and
so the absence of any of the task managers causes the job to enter a crash
loop where it fails to get the required task slots.

The failure occurs after a job has been running for a while, and when there
have been job and task manager restarts. We run in kubernetes so pod
disruptions occur from time to time, however we're running using the high
availability setup [0]

Has anyone encountered this before? Any suggestions?

Below are some error messages pulled from the task managers failing to
re-register.

```
] - Starting DefaultLeaderRetrievalService with
KubernetesLeaderRetrievalDriver{configMapName='streaming-sales-model-staging-resourcemanager-leader'}.
2021-08-16 13:15:10,112 INFO
 org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] -
Starting DefaultLeaderElectionService with
KubernetesLeaderElectionDriver{configMapName='streaming-sales-model-staging-resourcemanager-leader'}.
2021-08-16 13:15:10,205 INFO
 org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
[] - New leader elected 7f504af8-5f00-494e-8ecb-c3e67688bae8 for
streaming-sales-model-staging-restserver-leader.
2021-08-16 13:15:10,205 INFO
 org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
[] - New leader elected 7f504af8-5f00-494e-8ecb-c3e67688bae8 for
streaming-sales-model-staging-resourcemanager-leader.
2021-08-16 13:15:10,205 INFO
 org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
[] - New leader elected 7f504af8-5f00-494e-8ecb-c3e67688bae8 for
streaming-sales-model-staging-dispatcher-leader.
2021-08-16 13:15:10,211 INFO
 org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService []
- Starting DefaultLeaderRetrievalService with
KubernetesLeaderRetrievalDriver{configMapName='streaming-sales-model-staging-dispatcher-leader'}.
2021-08-16 13:16:26,103 WARN
 org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever []
- Error while retrieving the leader gateway. Retrying to connect to
akka.tcp://flink@flink-jobmanager:6123/user/rpc/dispatcher_1.
2021-08-16 13:16:30,978 WARN
 org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever []
- Error while retrieving the leader gateway. Retrying to connect to
akka.tcp://flink@flink-jobmanager:6123/user/rpc/dispatcher_1.
```

```
2021-08-15 14:02:21,078 ERROR org.apache.kafka.common.utils.KafkaThread
   [] - Uncaught exception in thread
'kafka-producer-network-thread |
trickle-producer-monorail_sales_facts_non_recent_v0_1-1629035259075':
java.lang.NoClassDefFoundError: org/apache/kafka/clients/NetworkClient$1
at
org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:748)
~[?:?]
at
org.apache.kafka.clients.NetworkClient.handleDisconnections(NetworkClient.java:899)
~[?:?]
at
org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:560) ~[?:?]
at
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:324)
~[?:?]
at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
~[?:?]
at java.lang.Thread.run(Unknown Source) [?:?]
Caused by: java.lang.ClassNotFoundException:
org.apache.kafka.clients.NetworkClient$1
at java.net.URLClassLoader.findClass(Unknown Source) ~[?:?]
at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
~[flink-dist_2.12-1.13.1-shopify-3491d59.jar:1.13.1-shopify-3491d59]
at
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
~[flink-dist_2.12-1.13.1-shopify-3491d59.jar:1.13.1-shopify-3491d59]
at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
~[flink-dist_2.12-1.13.1-shopify-3491d59.jar:1.13.1-shopify-3491d59]
at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
... 6 more
```

```
connection to [null] failed with java.net.ConnectException: Connection
refused: flink-jobmanager/10.28.65.100:6123
2021-08-16 13:14:59,668 WARN  akka.remote.ReliableDeliverySupervisor
[] - Association with remote system
[akka.tcp://flink@flink-jobmanager:6123] has failed, address is now gated
for [50] ms. Reason: [Association failed with
[akka.tcp://flink@flink-jobmanager:6123]] Caused by:
[java.net.ConnectException: Connection refused: flink-jobmanager/
10.28.65.100:6123]
2021-08-16 13:14:59,669 INFO
 org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - Could
not resolve ResourceManager address
akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_0,
retrying in 1 ms: Could not connect to rpc endpoint under address

Re: Can we release new flink-connector-redis? Thanks!

2021-08-17 Thread Austin Cawley-Edwards
Hi Hongbo,

Thanks for your interest in the Redis connector! I'm not entirely sure what
the release process is like for Bahir, but I've pulled in @Robert Metzger
 who has been involved in the project in the past and
can give an update there.

Best,
Austin

On Tue, Aug 17, 2021 at 10:41 AM Hongbo Miao 
wrote:

> Hi Flink friends,
>
> I recently have a question about how to set TTL to make Redis keys expire
> in flink-connector-redis.
> I originally posted at Stack Overflow at
> https://stackoverflow.com/questions/68795044/how-to-set-ttl-to-make-redis-keys-expire-in-flink-connector-redis
>
> Then I found there is a pull request added this feature about 2 years ago
> at https://github.com/apache/bahir-flink/pull/66
> However, it didn’t got released, which confirmed by David in Stack
> Overflow.
>
> I opened a requesting release ticket at
> https://issues.apache.org/jira/browse/BAHIR-279
> Please let me know if I there is a better way to request. Thanks!
>
> Best
> Hongbo
> www.hongbomiao.com
>


Re: Flink taskmanager in crash loop

2021-08-17 Thread Abhishek Rai
Before these message, there is the following message in the log:

2021-08-12 23:02:58.015 [Canceler/Interrupts for Source: MASKED])
(1/1)#29103' did not react to cancelling signal for 30 seconds, but is
stuck in method:
 java.base@11.0.11/jdk.internal.misc.Unsafe.park(Native Method)
java.base@11.0.11/java.util.concurrent.locks.LockSupport.parkNanos(Unknown
Source)
java.base@11.0.11/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(Unknown
Source)
app//org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:149)
app//org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:341)
app//org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:330)
app//org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:202)
app//org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:661)
app//org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
app//org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:776)
app//org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
java.base@11.0.11/java.lang.Thread.run(Unknown Source)

On Tue, Aug 17, 2021 at 9:22 AM Abhishek Rai  wrote:

> Thanks Yangze, indeed, I see the following in the log about 10s before the
> final crash (masked some sensitive data using `MASKED`):
>
> 2021-08-16 15:58:13.985 [Canceler/Interrupts for Source: MAKSED] WARN
> org.apache.flink.runtime.taskmanager.Task  - Task 'MASKED' did not react to
> cancelling signal for 30 seconds, but is stuck in method:
>  java.base@11.0.11/jdk.internal.misc.Unsafe.park(Native Method)
> java.base@11.0.11/java.util.concurrent.locks.LockSupport.park(Unknown
> Source)
> java.base@11.0.11/java.util.concurrent.CompletableFuture$Signaller.block(Unknown
> Source)
> java.base@11.0.11/java.util.concurrent.ForkJoinPool.managedBlock(Unknown
> Source)
> java.base@11.0.11/java.util.concurrent.CompletableFuture.waitingGet(Unknown
> Source)
> java.base@11.0.11/java.util.concurrent.CompletableFuture.join(Unknown
> Source)
>
> app//org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:705)
>
> app//org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cleanUpInvoke(SourceStreamTask.java:186)
>
> app//org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:637)
> app//org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:776)
> app//org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
> java.base@11.0.11/java.lang.Thread.run(Unknown Source)
>
> 2021-08-16 15:58:13.986 [Cancellation Watchdog for Source: MASKED] ERROR
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner  - Fatal error
> occurred while executing the TaskManager. Shutting it down...
> org.apache.flink.util.FlinkRuntimeException: Task did not exit gracefully
> within 180 + seconds.
>   at
> org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1718)
>   at java.base/java.lang.Thread.run(Unknown Source)
>
>
>
> On Mon, Aug 16, 2021 at 7:05 PM Yangze Guo  wrote:
>
>> Hi, Abhishek,
>>
>> Do you see something like "Fatal error occurred while executing the
>> TaskManager" in your log or would you like to provide the whole task
>> manager log?
>>
>> Best,
>> Yangze Guo
>>
>> On Tue, Aug 17, 2021 at 5:17 AM Abhishek Rai 
>> wrote:
>> >
>> > Hello,
>> >
>> > In our production environment, running Flink 1.13 (Scala 2.11), where
>> Flink has been working without issues with a dozen or so jobs running for a
>> while, Flink taskmanager started crash looping with a period of ~4 minutes
>> per crash.  The stack trace is not very informative, therefore reaching out
>> for help, see below.
>> >
>> > The only other thing that's unusual is that due to what might be a
>> product issue (custom job code running on Flink), some or all of our tasks
>> are also in a crash loop.  Still, I wasn't expecting taskmanager itself to
>> die.  Does taskmanager have some built in feature to crash if all/most
>> tasks are crashing?
>> >
>> > 2021-08-16 15:58:23.984 [main] ERROR
>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner  - Terminating
>> TaskManagerRunner with exit code 1.
>> > org.apache.flink.util.FlinkException: Unexpected failure during runtime
>> of TaskManagerRunner.
>> >   at
>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManager(TaskManagerRunner.java:382)
>> >   at
>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.lambda$runTaskManagerProcessSecurely$3(TaskManagerRunner.java:413)
>> >   at java.base/java.security.AccessController.doPrivileged(Native
>> Method)
>> >   at java.base/javax.security.auth.Subject.doAs(Unknown Source)
>> >   at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682)
>> >   at
>> 

Re: Flink taskmanager in crash loop

2021-08-17 Thread Abhishek Rai
Thanks Yangze, indeed, I see the following in the log about 10s before the
final crash (masked some sensitive data using `MASKED`):

2021-08-16 15:58:13.985 [Canceler/Interrupts for Source: MAKSED] WARN
org.apache.flink.runtime.taskmanager.Task  - Task 'MASKED' did not react to
cancelling signal for 30 seconds, but is stuck in method:
 java.base@11.0.11/jdk.internal.misc.Unsafe.park(Native Method)
java.base@11.0.11/java.util.concurrent.locks.LockSupport.park(Unknown
Source)
java.base@11.0.11/java.util.concurrent.CompletableFuture$Signaller.block(Unknown
Source)
java.base@11.0.11/java.util.concurrent.ForkJoinPool.managedBlock(Unknown
Source)
java.base@11.0.11/java.util.concurrent.CompletableFuture.waitingGet(Unknown
Source)
java.base@11.0.11/java.util.concurrent.CompletableFuture.join(Unknown
Source)
app//org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:705)
app//org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cleanUpInvoke(SourceStreamTask.java:186)
app//org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:637)
app//org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:776)
app//org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
java.base@11.0.11/java.lang.Thread.run(Unknown Source)

2021-08-16 15:58:13.986 [Cancellation Watchdog for Source: MASKED] ERROR
org.apache.flink.runtime.taskexecutor.TaskManagerRunner  - Fatal error
occurred while executing the TaskManager. Shutting it down...
org.apache.flink.util.FlinkRuntimeException: Task did not exit gracefully
within 180 + seconds.
  at
org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1718)
  at java.base/java.lang.Thread.run(Unknown Source)



On Mon, Aug 16, 2021 at 7:05 PM Yangze Guo  wrote:

> Hi, Abhishek,
>
> Do you see something like "Fatal error occurred while executing the
> TaskManager" in your log or would you like to provide the whole task
> manager log?
>
> Best,
> Yangze Guo
>
> On Tue, Aug 17, 2021 at 5:17 AM Abhishek Rai 
> wrote:
> >
> > Hello,
> >
> > In our production environment, running Flink 1.13 (Scala 2.11), where
> Flink has been working without issues with a dozen or so jobs running for a
> while, Flink taskmanager started crash looping with a period of ~4 minutes
> per crash.  The stack trace is not very informative, therefore reaching out
> for help, see below.
> >
> > The only other thing that's unusual is that due to what might be a
> product issue (custom job code running on Flink), some or all of our tasks
> are also in a crash loop.  Still, I wasn't expecting taskmanager itself to
> die.  Does taskmanager have some built in feature to crash if all/most
> tasks are crashing?
> >
> > 2021-08-16 15:58:23.984 [main] ERROR
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner  - Terminating
> TaskManagerRunner with exit code 1.
> > org.apache.flink.util.FlinkException: Unexpected failure during runtime
> of TaskManagerRunner.
> >   at
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManager(TaskManagerRunner.java:382)
> >   at
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.lambda$runTaskManagerProcessSecurely$3(TaskManagerRunner.java:413)
> >   at java.base/java.security.AccessController.doPrivileged(Native Method)
> >   at java.base/javax.security.auth.Subject.doAs(Unknown Source)
> >   at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682)
> >   at
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> >   at
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManagerProcessSecurely(TaskManagerRunner.java:413)
> >   at
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManagerProcessSecurely(TaskManagerRunner.java:396)
> >   at
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.main(TaskManagerRunner.java:354)
> > Caused by: java.util.concurrent.TimeoutException: null
> >   at
> org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:1255)
> >   at
> org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:217)
> >   at
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$orTimeout$15(FutureUtils.java:582)
> >   at
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown
> Source)
> >   at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
> >   at
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
> Source)
> >   at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
> Source)
> >   at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
> >   at java.base/java.lang.Thread.run(Unknown Source)
> > 2021-08-16 15:58:23.986 [TaskExecutorLocalStateStoresManager shutdown
> hook] INFO  o.a.flink.runtime.state.TaskExecutorLocalStateStoresManager  -
> Shutting down TaskExecutorLocalStateStoresManager.
> >
> >

Can we release new flink-connector-redis? Thanks!

2021-08-17 Thread Hongbo Miao
Hi Flink friends,

I recently have a question about how to set TTL to make Redis keys expire in 
flink-connector-redis.
I originally posted at Stack Overflow at 
https://stackoverflow.com/questions/68795044/how-to-set-ttl-to-make-redis-keys-expire-in-flink-connector-redis

Then I found there is a pull request added this feature about 2 years ago at 
https://github.com/apache/bahir-flink/pull/66
However, it didn’t got released, which confirmed by David in Stack Overflow.

I opened a requesting release ticket at 
https://issues.apache.org/jira/browse/BAHIR-279
Please let me know if I there is a better way to request. Thanks!

Best
Hongbo
www.hongbomiao.com


Re: Handling HTTP Requests for Keyed Streams

2021-08-17 Thread Rion Williams
Hi Caizhi,

I don’t mind the request being synchronous (or not using the Async I/O 
connectors). Assuming I go down that route would this be the appropriate way to 
handle this? Specifically creating an HttpClient and storing the result in 
state and on a keyed stream if the state was empty?

It makes sense to me, just wondering if there are any gotchas or 
recommendations in terms of a client that might support things like retries and 
if this a good pattern to accomplish this.

Thanks,

Rion

> On Aug 16, 2021, at 11:57 PM, Caizhi Weng  wrote:
> 
> 
> Hi!
> 
> As you mentioned that the configuration fetching is very infrequent, why 
> don't you use a blocking approach to send HTTP requests and receive 
> responses? This seems like a more reasonable solution to me.
> 
> Rion Williams  于2021年8月17日周二 上午4:00写道:
>> Hi all,
>> 
>> I've been exploring a few different options for storing tenant-specific 
>> configurations within Flink state based on the messages I have flowing 
>> through my job. Initially I had considered creating a source that would 
>> periodically poll an HTTP API and connect that stream to my original event 
>> stream.
>> 
>> However, I realized that this configuration information would basically 
>> never change and thus it doesn't quite make sense to poll so frequently. My 
>> next approach would be to have a function that would be keyed (by tenant) 
>> and storing the configuration for that tenant in state (and issue an HTTP 
>> call when I did not have it). Something like this:
>> 
>> class ConfigurationLookupFunction: KeyedProcessFunction> JsonObject>() {
>> // Tenant specific configuration
>> private lateinit var httpClient: HttpClient
>> private lateinit var configuration: ValueState
>> 
>> override fun open(parameters: Configuration) {
>> super.open(parameters)
>> httpClient = HttpClient.newHttpClient()
>> }
>> 
>> override fun processElement(message: JsonObject, context: Context, out: 
>> Collector) {
>> if (configuration.value() == null){
>> // Issue a request to the appropriate API to load the 
>> configuration
>> val url = 
>> HttpRequest.newBuilder(URI.create(".../${context.currentKey}")).build()
>> httpClient.send(..., {
>> // Store the configuration info within state here
>> configuration.update(...)
>> })
>> 
>> out.collect(message)
>> }
>> else {
>> // Get the configuration information and pass it downstream to 
>> be used by the sink
>> out.collect(message)
>> }
>> }
>> }
>> I didn't see any support for using the Async I/O functions from a keyed 
>> context, otherwise I'd imagine that would be ideal. The requests themselves 
>> should be very infrequent (initial call per tenant) and I'd imagine after 
>> that the necessary configuration could be pulled/stored within the state for 
>> that key.
>> 
>> Is there a good way of handling this that I might be overlooking with an 
>> existing Flink construct or function? I'd love to be able to leverage the 
>> Async I/O connectors as they seem pretty well thought out.
>> 
>> Thanks in advance!
>> 
>> Rion
>> 
>> 


flinksql的udf中可以使用Operator state的api么?

2021-08-17 Thread andrew
 hi,你好:
 通过flinksql读kafka数据流,实现监控用户信息基于上一次状态值发生变更触发最新用户信息输出.

Re:回复:如何监控kafka延迟

2021-08-17 Thread andrew



@Jimmy Zhang
了解下checkpoint/savepoint 中间计算的结果可以间隔时间写入外部hdfs等

在 2021-08-09 09:51:21,"Jimmy Zhang"  写道:
>您好,看到你们在用kafka相关metrics,我想咨询一个问题。你们是否遇见过在重启一个kafka sink 
>job后,相关指标清零的情况?这样是不是就无法持续的进行数据想加?我们想做一个数据对账,查询不同时间段的输出量统计,这样可能中间归零就有问题,所以想咨询下,任何的回复都非常感谢!
>
>
>
>
>|
>Best,
>Jimmy
>|
>
>Signature is customized by Netease Mail Master
>
>在2021年07月28日 17:58,jie mei 写道:
>hi,all
>
>我们是通过 grafana 对采集到的 flink kafka 的
>metrics(taskmanager_job_task_operator_KafkaConsumer_records) 配置报警规则来报警的。
>
>xuhaiLong  于2021年7月28日周三 下午5:46写道:
>
>> 参考下kafka_exporter,获取所有的 group 的消费情况,然后配置不同的规则去监控。
>>
>>
>> 在2021年7月28日 17:39,laohu<2372554...@qq.com.INVALID> 写道:
>> Hi comsir
>>
>> kafka的控制台能力比较弱,想知道延迟只能自己维护。
>>
>> 维护方式:
>>
>> 1. 每个服务的topic的offset 减去 groupid的offset
>>
>> 2. 尽量可以计算出各种消费速度
>>
>> 3. rocketmq控制台,可看到消费进度,可以参照下。
>>
>>
>> 在 2021/7/28 上午11:02, 龙逸尘 写道:
>> Hi comsir,
>> 采用 kafka 集群元数据 的 offset 信息和当前 group offset 相减得到的 lag 是比较准确的。
>> group  id 需要自己维护。
>>
>> comsir <609326...@qq.com.invalid> 于2021年7月20日周二 下午12:41写道:
>>
>> hi all
>> 以kafka为source的flink任务,各位都是如何监控kafka的延迟情况??
>> 监控这个延迟的目的:1.大盘展示,2.延迟后报警
>> 小问题:
>> 1.发现flink原生的相关metric指标很多,研究后都不是太准确,大家都用哪个指标?
>> 2.怎么获取groupId呢,多个group消费的话,如何区分呀?
>> 3.能通过kafka集群侧的元数据,和当前offset做减法,计算lag吗?
>> 4.有比较优雅的实现方式吗?
>> 非常感谢 期待解答 感谢感谢
>>
>
>
>--
>
>*Best Regards*
>*Jeremy Mei*


Re: Upgrading from Flink on YARN 1.9 to 1.11

2021-08-17 Thread David Morávek
Hi Andreas,

the problem here is that the command you're using is starting a per-job
cluster (which is obvious from the used deployment method "
YarnClusterDescriptor.deployJobCluster"). Apparently the `-m yarn-cluster`
flag is deprecated and no longer supported, I think this is something we
should completely remove in the near future. Also this was always supposed
to start your job in per-job mode, but unfortunately in older versions this
was kind of simulated using session cluster, so I'd say it has just worked
by an accident (a.k.a "undocumented bug / feature").

What you really want to do is to start a session cluster upfront and than
use a `yarn-session` deployment target (where you need to provide yarn
application id so Flink can search for the active JobManager). This is well
documented in the yarn section of the docs

[1].

Can you please try this approach a let me know if that helped?

[1]
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/yarn/#session-mode

Best,
D.

On Mon, Aug 16, 2021 at 8:52 PM Hailu, Andreas [Engineering] <
andreas.ha...@gs.com> wrote:

> Hi David,
>
>
>
> You’re correct about classpathing problems – thanks for your help in
> spotting them. I was able to get past that exception by removing some
> conflicting packages in my shaded JAR, but I’m seeing something else that’s
> interesting. With the 2 threads trying to submit jobs, one of the threads
> is able submit and process data successfully, while the other thread fails.
>
>
>
> Log snippet:
>
> 2021-08-16 13:43:12,893 [thread-1] INFO  YarnClusterDescriptor - Cluster
> specification: ClusterSpecification{masterMemoryMB=4096,
> taskManagerMemoryMB=18432, slotsPerTaskManager=2}
>
> 2021-08-16 13:43:12,893 [thread-2] INFO  YarnClusterDescriptor - Cluster
> specification: ClusterSpecification{masterMemoryMB=4096,
> taskManagerMemoryMB=18432, slotsPerTaskManager=2}
>
> 2021-08-16 13:43:12,897 [thread-2] WARN  PluginConfig - The plugins
> directory [plugins] does not exist.
>
> 2021-08-16 13:43:12,897 [thread-1] WARN  PluginConfig - The plugins
> directory [plugins] does not exist.
>
> 2021-08-16 13:43:13,104 [thread-2] WARN  PluginConfig - The plugins
> directory [plugins] does not exist.
>
> 2021-08-16 13:43:13,104 [thread-1] WARN  PluginConfig - The plugins
> directory [plugins] does not exist.
>
> 2021-08-16 13:43:20,475 [thread-1] INFO  YarnClusterDescriptor - Adding
> delegation token to the AM container.
>
> 2021-08-16 13:43:20,488 [thread-1] INFO  DFSClient - Created
> HDFS_DELEGATION_TOKEN token 56247060 for delp on ha-hdfs:d279536
>
> 2021-08-16 13:43:20,512 [thread-1] INFO  TokenCache - Got dt for
> hdfs://d279536; Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:d279536,
> Ident: (HDFS_DELEGATION_TOKEN token 56247060 for delp)
>
> 2021-08-16 13:43:20,513 [thread-1] INFO  Utils - Attempting to obtain
> Kerberos security token for HBase
>
> 2021-08-16 13:43:20,513 [thread-1] INFO  Utils - HBase is not available
> (not packaged with this application): ClassNotFoundException :
> "org.apache.hadoop.hbase.HBaseConfiguration".
>
> 2021-08-16 13:43:20,564 [thread-2] WARN  YarnClusterDescriptor - Add job
> graph to local resource fail.
>
> 2021-08-16 13:43:20,570 [thread-1] INFO  YarnClusterDescriptor -
> Submitting application master application_1628992879699_11275
>
> 2021-08-16 13:43:20,570 [thread-2] ERROR FlowDataBase - Exception running
> data flow for thread-2
>
> org.apache.flink.client.deployment.ClusterDeploymentException: Could not
> deploy Yarn job cluster.
>
> at
> org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:431)
>
> at
> org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:70)
>
> at
> org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:973)
>
> at
> org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:124)
>
> at
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:72)
>
> at
> com.gs.ep.da.lake.refinerlib.flink.ExecutionEnvironmentWrapper.execute(ExecutionEnvironmentWrapper.java:49)
>
> ...
>
> Caused by: java.io.IOException: Filesystem closed
>
> at
> org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:826)
>
> at
> org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:2152)
>
> ...
>
> 2021-08-16 13:43:20,979 [thread-1] INFO  TimelineClientImpl - Timeline
> service address: http://d279536-003.dc.gs.com:8188/ws/v1/timeline/
>
> 2021-08-16 13:43:21,377 [thread-1] INFO  YarnClientImpl - Submitted
> application application_1628992879699_11275
>
> 2021-08-16 13:43:21,377 [thread-1] INFO  YarnClusterDescriptor - Waiting
> for the cluster to be allocated
>
> 

PyFlink StreamingFileSink bulk-encoded format (Avro)

2021-08-17 Thread Kamil ty
Hello,

I'm trying to save my data stream to an Avro file on HDFS. In Flink
documentation I can only see explanations for Java/Scala. However, I can't
seem to find a way to do it in PyFlink. Is this possible to do in PyFlink
currently?

Kind Regards
Kamil


????????

2021-08-17 Thread 1421070960
??

Re: Flink On Yarn HA 部署模式下Flink程序无法启动

2021-08-17 Thread 周瑞
您好,我的版本是1.13.1


--Original--
From: "Yang Wang"https://issues.apache.org/jira/browse/FLINK-19212

Best,
Yang

周瑞 

Re: Flink On Yarn HA 部署模式下Flink程序无法启动

2021-08-17 Thread Yang Wang
看报错应该是个已知问题[1]并且已经在1.11.2中修复

[1]. https://issues.apache.org/jira/browse/FLINK-19212

Best,
Yang

周瑞  于2021年8月17日周二 上午11:04写道:

> 您好:Flink程序部署在Yran上以Appliation Mode 模式启动的,在没有采用HA
> 模式的时候可以正常启动,配置了HA之后,启动异常,麻烦帮忙看下是什么原因导致的.
>
>
> HA 配置如下:
> high-availability: zookeeper high-availability.storageDir:
> hdfs://mycluster/flink/ha high-availability.zookeeper.quorum:
> zk-1:2181,zk-2:2181,zk-3:2181 high-availability.zookeeper.path.root: /flink
> high-availability.cluster-id: /flink_cluster
>
>
> 异常如下:
> 2021-08-17 10:24:18,938 INFO
> org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] -
> Starting DefaultLeaderElectionService with
> ZooKeeperLeaderElectionDriver{leaderPath='/leader/resource_manager_lock'}.
> 2021-08-17 10:25:09,706 ERROR
> org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler
> [] - Unhandled exception.
> org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException: Failed to
> serialize the result for RPC call : requestTaskManagerDetailsInfo.
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.serializeRemoteResultAndVerifySize(AkkaRpcActor.java:404)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$sendAsyncResponse$0(AkkaRpcActor.java:360)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)
> ~[?:1.8.0_292]
> at
> java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:848)
> ~[?:1.8.0_292]
> at
> java.util.concurrent.CompletableFuture.handle(CompletableFuture.java:2168)
> ~[?:1.8.0_292]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.sendAsyncResponse(AkkaRpcActor.java:352)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:319)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at 
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at 
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at 
> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at akka.actor.Actor.aroundReceive(Actor.scala:517)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> akka.actor.Actor.aroundReceive$(Actor.scala:515)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> [flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> akka.actor.ActorCell.invoke(ActorCell.scala:561)
> [flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> [flink-dist_2.12-1.13.1.jar:1.13.1]
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> [flink-dist_2.12-1.13.1.jar:1.13.1]
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> [flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> [flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> [flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> [flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> [flink-dist_2.12-1.13.1.jar:1.13.1]
> Caused by: java.io.NotSerializableException:
> org.apache.flink.runtime.resourcemanager.TaskManagerInfoWithSlots
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
> ~[?:1.8.0_292]
> at
> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> ~[?:1.8.0_292]
> at
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:624)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcSerializedValue.valueOf(AkkaRpcSerializedValue.java:66)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> 

Re: Problems with reading ORC files with S3 filesystem

2021-08-17 Thread Piotr Jagielski
Hi David,

Thanks for your answer. I finally managed to read ORC files by:
- switching to s3a:// in my Flink SQL table path parameter
- providing all the properties in Hadoop's core-site.xml file (fs.s3a.endpoint, 
fs.s3a.path.style.access, fs.s3a.aws.credentials.provider, fs.s3a.access.key, 
fs.s3a.secret.key)
- setting HADOOP_CONF_DIR env variable pointing to directory containing 
core-site.xml

Regards,
Piotr

On 2021/08/16 09:07:48, David Morávek  wrote: 
> Hi Piotr,
> 
> unfortunately this is a known long-standing issue [1]. The problem is that
> ORC format is not using Flink's filesystem abstraction for actual reading
> of the underlying file, so you have to adjust your Hadoop config
> accordingly. There is also a corresponding SO question [2] covering this.
> 
> I think a proper fix would actually require changing the interface on ORC
> side, because currently there seems to be now easy way to switch the FS
> implementation (I've just quickly checked OrcFile class, so this might not
> be 100% accurate).
> 
> [1] https://issues.apache.org/jira/browse/FLINK-10989
> [2] https://stackoverflow.com/a/53435359
> 
> Best,
> D.
> 
> On Sat, Aug 14, 2021 at 11:40 AM Piotr Jagielski  wrote:
> 
> > Hi,
> > I want to use Flink SQL filesystem to read ORC file via S3 filesystem on
> > Flink 1.13. My table definition looks like this:
> >
> > create or replace table xxx
> >  (..., startdate string)
> >  partitioned by (startdate) with ('connector'='filesystem',
> > 'format'='orc', 'path'='s3://xxx/orc/yyy')
> >
> > I followed Flink's S3 guide and installed S3 libs as plugin. I have MinIO
> > as S3 provider and it works for Flinks checkpoints and HA files.
> > The SQL connector also works when I use CSV or Avro formats. The problems
> > start with ORC
> >
> > 1. If I just put flink-orc on job's classpath I get error on JobManager:
> > Caused by: java.lang.NoClassDefFoundError:
> > org/apache/hadoop/conf/Configuration
> > at
> > org.apache.flink.orc.OrcFileFormatFactory$1.createRuntimeDecoder(OrcFileFormatFactory.java:121)
> > ~[?:?]
> > at
> > org.apache.flink.orc.OrcFileFormatFactory$1.createRuntimeDecoder(OrcFileFormatFactory.java:88)
> > ~[?:?]
> > at
> > org.apache.flink.table.filesystem.FileSystemTableSource.getScanRuntimeProvider(FileSystemTableSource.java:118)
> > ~[flink-table-blink_2.12-1.13.2.jar:1.13.2]
> >
> > 2. I managed to put hadoop common libs on the classpath by this maven
> > setup:
> >
> > 
> > org.apache.flink
> >
> > flink-orc_${scala.binary.version}
> > ${flink.version}
> > 
> > 
> > org.apache.orc
> > orc-core
> > 
> > 
> > 
> > 
> > org.apache.orc
> > orc-core
> > 1.5.6
> > 
> > 
> > org.apache.orc
> > orc-shims
> > 1.5.6
> > 
> > 
> > net.java.dev.jets3t
> > jets3t
> > 0.9.0
> > 
> >
> > No the job is accepted by JobManager, but execution fails with lack of AWS
> > credentials:
> > Caused by: java.lang.IllegalArgumentException: AWS Access Key ID and
> > Secret Access Key must be specified as the username or password
> > (respectively) of a s3 URL, or by setting the fs.s3.awsAccessKeyId or
> > fs.s3.awsSecretAccessKey properties (respectively).
> > at
> > org.apache.hadoop.fs.s3.S3Credentials.initialize(S3Credentials.java:70)
> > at
> > org.apache.hadoop.fs.s3.Jets3tFileSystemStore.initialize(Jets3tFileSystemStore.java:92)
> > at
> > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> > Method)
> > at
> > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown
> > Source)
> > at
> > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
> > Source)
> > at java.base/java.lang.reflect.Method.invoke(Unknown Source)
> > at
> > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
> > at
> > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
> > at com.sun.proxy.$Proxy76.initialize(Unknown Source)
> > at
> > org.apache.hadoop.fs.s3.S3FileSystem.initialize(S3FileSystem.java:92)
> > at
> > org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2433)
> > at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:88)
> > at
> > org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2467)
> > at 

Re: NullPointerException in StateTable.put()

2021-08-17 Thread László Ciople
I removed that line from the code and it seems to have solved the problem.
Thank you very much! :)
All the best,
Laszlo

On Tue, Aug 17, 2021 at 9:54 AM László Ciople 
wrote:

> Ok, thank you for the tips. I will modify it and get back to you :)
>
> On Tue, Aug 17, 2021 at 9:42 AM David Morávek  wrote:
>
>> Hi Laszlo,
>>
>> Please use reply-all for mailing list replies. This may help others
>> finding their answer in the future ;)
>>
>>
>>> sb.append(DeviceDetail.class.getName()).append('@').append(Integer.toHexString(System.identityHashCode(this))).append('[');
>>
>>
>> This part will again make your key non-deterministic, because you're
>> using a memory address inside the content for hashing. I don't see any
>> other problem in the snippet you've sent.
>>
>> Best,
>> D.
>>
>> On Tue, Aug 17, 2021 at 8:33 AM László Ciople 
>> wrote:
>>
>>> I modified the code to use a sha256 hash instead of the hashCode when
>>> the id is not present in the object. The same behaviour was manifested
>>> still. Here is the code that selects the key:
>>> @Override
>>> public String getKey(AzureADIamEvent value) throws Exception {
>>> // key is the device id or the hash of the device properties
>>> String key = value.payload.properties.deviceDetail.deviceId;
>>>
>>> if (key == null || key.equals("")) {
>>> LOG.warn("Device id is null or empty, using sha256 value");
>>> key = DigestUtils.sha256Hex(value.payload.properties.
>>> deviceDetail.toString());
>>> }
>>>
>>> return key;
>>> }
>>>
>>> And the definition of the class the key is created from:
>>> public class DeviceDetail {
>>> @JsonProperty("browser")
>>> public String browser;
>>> @JsonProperty("deviceId")
>>> public String deviceId;
>>> @JsonProperty("displayName")
>>> public String displayName;
>>> @JsonProperty("operatingSystem")
>>> public String operatingSystem;
>>> @JsonProperty("trustType")
>>> public String trustType;
>>> @Override
>>> public String toString() {
>>> StringBuilder sb = new StringBuilder();
>>> sb.append(DeviceDetail.class.getName()).append('@').append(
>>> Integer.toHexString(System.identityHashCode(this))).append('[');
>>> sb.append("browser");
>>> sb.append('=');
>>> sb.append(((this.browser == null)?"":this.browser));
>>> sb.append(',');
>>> sb.append("deviceId");
>>> sb.append('=');
>>> sb.append(((this.deviceId == null)?"":this.deviceId));
>>> sb.append(',');
>>> sb.append("displayName");
>>> sb.append('=');
>>> sb.append(((this.displayName == null)?"":this.displayName
>>> ));
>>> sb.append(',');
>>> sb.append("operatingSystem");
>>> sb.append('=');
>>> sb.append(((this.operatingSystem == null)?"":this.
>>> operatingSystem));
>>> sb.append(',');
>>> sb.append("trustType");
>>> sb.append('=');
>>> sb.append(((this.trustType == null)?"":this.trustType));
>>> sb.append(',');
>>> if (sb.charAt((sb.length()- 1)) == ',') {
>>> sb.setCharAt((sb.length()- 1), ']');
>>> } else {
>>> sb.append(']');
>>> }
>>> return sb.toString();
>>> }
>>> }
>>>
>>>


Re: NullPointerException in StateTable.put()

2021-08-17 Thread László Ciople
Ok, thank you for the tips. I will modify it and get back to you :)

On Tue, Aug 17, 2021 at 9:42 AM David Morávek  wrote:

> Hi Laszlo,
>
> Please use reply-all for mailing list replies. This may help others
> finding their answer in the future ;)
>
>
>> sb.append(DeviceDetail.class.getName()).append('@').append(Integer.toHexString(System.identityHashCode(this))).append('[');
>
>
> This part will again make your key non-deterministic, because you're using
> a memory address inside the content for hashing. I don't see any other
> problem in the snippet you've sent.
>
> Best,
> D.
>
> On Tue, Aug 17, 2021 at 8:33 AM László Ciople 
> wrote:
>
>> I modified the code to use a sha256 hash instead of the hashCode when the
>> id is not present in the object. The same behaviour was manifested still.
>> Here is the code that selects the key:
>> @Override
>> public String getKey(AzureADIamEvent value) throws Exception {
>> // key is the device id or the hash of the device properties
>> String key = value.payload.properties.deviceDetail.deviceId;
>>
>> if (key == null || key.equals("")) {
>> LOG.warn("Device id is null or empty, using sha256 value");
>> key = DigestUtils.sha256Hex(value.payload.properties.
>> deviceDetail.toString());
>> }
>>
>> return key;
>> }
>>
>> And the definition of the class the key is created from:
>> public class DeviceDetail {
>> @JsonProperty("browser")
>> public String browser;
>> @JsonProperty("deviceId")
>> public String deviceId;
>> @JsonProperty("displayName")
>> public String displayName;
>> @JsonProperty("operatingSystem")
>> public String operatingSystem;
>> @JsonProperty("trustType")
>> public String trustType;
>> @Override
>> public String toString() {
>> StringBuilder sb = new StringBuilder();
>> sb.append(DeviceDetail.class.getName()).append('@').append(
>> Integer.toHexString(System.identityHashCode(this))).append('[');
>> sb.append("browser");
>> sb.append('=');
>> sb.append(((this.browser == null)?"":this.browser));
>> sb.append(',');
>> sb.append("deviceId");
>> sb.append('=');
>> sb.append(((this.deviceId == null)?"":this.deviceId));
>> sb.append(',');
>> sb.append("displayName");
>> sb.append('=');
>> sb.append(((this.displayName == null)?"":this.displayName
>> ));
>> sb.append(',');
>> sb.append("operatingSystem");
>> sb.append('=');
>> sb.append(((this.operatingSystem == null)?"":this.
>> operatingSystem));
>> sb.append(',');
>> sb.append("trustType");
>> sb.append('=');
>> sb.append(((this.trustType == null)?"":this.trustType));
>> sb.append(',');
>> if (sb.charAt((sb.length()- 1)) == ',') {
>> sb.setCharAt((sb.length()- 1), ']');
>> } else {
>> sb.append(']');
>> }
>> return sb.toString();
>> }
>> }
>>
>>


flink ??????????????truncate table????

2021-08-17 Thread Asahi Lee
hi!
  flink??truncate table??flink 
hivetruncate table??

Re: RabbitMQ 3.9+ Native Streams

2021-08-17 Thread David Morávek
This would be awesome! We have the contribution guide
 [1] that
should give you a rough idea on how to approach the contribution. Let me
know if you need any further guidance, I'd be happy to help ;)

[1] https://flink.apache.org/contributing/how-to-contribute.html

Best,
D.

On Tue, Aug 17, 2021 at 1:17 AM Rob Englander 
wrote:

> I will definitely consider the contribution idea :)
>
>
> On Mon, Aug 16, 2021 at 3:16 PM David Morávek  wrote:
>
>> Hi Rob,
>>
>> there is currently no on-going effort for this topic, I think this would
>> be a really great contribution though. This seems to be pushing RabbitMQ
>> towards new usages ;)
>>
>> Best,
>> D.
>>
>> On Mon, Aug 16, 2021 at 8:16 PM Rob Englander 
>> wrote:
>>
>>> I'm wondering if there's any work underway to develop
>>> DataSource/DataSink for RabbitMQ's streams recently released in RMQ 3.9?
>>>
>>> Rob Englander
>>>
>>


flink 1.13.1??????????hive??????????insert overwirite??????????????????????????????????????????

2021-08-17 Thread Asahi Lee
hi??
 
??sqlselect0??hive
INSERT OVERWRITE target_table SELECT * from source_table where id  10;

Re: NullPointerException in StateTable.put()

2021-08-17 Thread David Morávek
Hi Laszlo,

Please use reply-all for mailing list replies. This may help others finding
their answer in the future ;)


> sb.append(DeviceDetail.class.getName()).append('@').append(Integer.toHexString(System.identityHashCode(this))).append('[');


This part will again make your key non-deterministic, because you're using
a memory address inside the content for hashing. I don't see any other
problem in the snippet you've sent.

Best,
D.

On Tue, Aug 17, 2021 at 8:33 AM László Ciople 
wrote:

> I modified the code to use a sha256 hash instead of the hashCode when the
> id is not present in the object. The same behaviour was manifested still.
> Here is the code that selects the key:
> @Override
> public String getKey(AzureADIamEvent value) throws Exception {
> // key is the device id or the hash of the device properties
> String key = value.payload.properties.deviceDetail.deviceId;
>
> if (key == null || key.equals("")) {
> LOG.warn("Device id is null or empty, using sha256 value");
> key = DigestUtils.sha256Hex(value.payload.properties.
> deviceDetail.toString());
> }
>
> return key;
> }
>
> And the definition of the class the key is created from:
> public class DeviceDetail {
> @JsonProperty("browser")
> public String browser;
> @JsonProperty("deviceId")
> public String deviceId;
> @JsonProperty("displayName")
> public String displayName;
> @JsonProperty("operatingSystem")
> public String operatingSystem;
> @JsonProperty("trustType")
> public String trustType;
> @Override
> public String toString() {
> StringBuilder sb = new StringBuilder();
> sb.append(DeviceDetail.class.getName()).append('@').append(Integer
> .toHexString(System.identityHashCode(this))).append('[');
> sb.append("browser");
> sb.append('=');
> sb.append(((this.browser == null)?"":this.browser));
> sb.append(',');
> sb.append("deviceId");
> sb.append('=');
> sb.append(((this.deviceId == null)?"":this.deviceId));
> sb.append(',');
> sb.append("displayName");
> sb.append('=');
> sb.append(((this.displayName == null)?"":this.displayName));
> sb.append(',');
> sb.append("operatingSystem");
> sb.append('=');
> sb.append(((this.operatingSystem == null)?"":this.
> operatingSystem));
> sb.append(',');
> sb.append("trustType");
> sb.append('=');
> sb.append(((this.trustType == null)?"":this.trustType));
> sb.append(',');
> if (sb.charAt((sb.length()- 1)) == ',') {
> sb.setCharAt((sb.length()- 1), ']');
> } else {
> sb.append(']');
> }
> return sb.toString();
> }
> }
>
>


Re:回复:如何监控kafka延迟

2021-08-17 Thread RS
1. metric指标每次都会清0的2. 数据对账的话, 可以将每次的统计数据按时间点保存起来, 然后查询时间范围的时候, 做sum求和来对账
在 2021-08-09 09:51:43,"Jimmy Zhang"  写道:
>您好,看到你们在用kafka相关metrics,我想咨询一个问题。你们是否遇见过在重启一个kafka sink 
>job后,相关指标清零的情况?这样是不是就无法持续的进行数据想加?我们想做一个数据对账,查询不同时间段的输出量统计,这样可能中间归零就有问题,所以想咨询下,任何的回复都非常感谢!
>
>
>
>
>|
>Best,
>Jimmy
>|
>
>Signature is customized by Netease Mail Master
>
>在2021年07月28日 17:58,jie mei 写道:
>hi,all
>
>我们是通过 grafana 对采集到的 flink kafka 的
>metrics(taskmanager_job_task_operator_KafkaConsumer_records) 配置报警规则来报警的。
>
>xuhaiLong  于2021年7月28日周三 下午5:46写道:
>
>> 参考下kafka_exporter,获取所有的 group 的消费情况,然后配置不同的规则去监控。
>>
>>
>> 在2021年7月28日 17:39,laohu<2372554...@qq.com.INVALID> 写道:
>> Hi comsir
>>
>> kafka的控制台能力比较弱,想知道延迟只能自己维护。
>>
>> 维护方式:
>>
>> 1. 每个服务的topic的offset 减去 groupid的offset
>>
>> 2. 尽量可以计算出各种消费速度
>>
>> 3. rocketmq控制台,可看到消费进度,可以参照下。
>>
>>
>> 在 2021/7/28 上午11:02, 龙逸尘 写道:
>> Hi comsir,
>> 采用 kafka 集群元数据 的 offset 信息和当前 group offset 相减得到的 lag 是比较准确的。
>> group  id 需要自己维护。
>>
>> comsir <609326...@qq.com.invalid> 于2021年7月20日周二 下午12:41写道:
>>
>> hi all
>> 以kafka为source的flink任务,各位都是如何监控kafka的延迟情况??
>> 监控这个延迟的目的:1.大盘展示,2.延迟后报警
>> 小问题:
>> 1.发现flink原生的相关metric指标很多,研究后都不是太准确,大家都用哪个指标?
>> 2.怎么获取groupId呢,多个group消费的话,如何区分呀?
>> 3.能通过kafka集群侧的元数据,和当前offset做减法,计算lag吗?
>> 4.有比较优雅的实现方式吗?
>> 非常感谢 期待解答 感谢感谢
>>
>
>
>--
>
>*Best Regards*
>*Jeremy Mei*