Streaming File Sink??????????

2020-03-16 Thread 58683632
Streaming File Sinkparquet avrobulk writefinal 
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(60 * 1000, CheckpointingMode.AT_LEAST_ONCE);
env.setStateBackend(new FsStateBackend(new Path("file:///g:/checkpoint")));
Schema schema = new 
Schema.Parser().parse("{\"namespace\":\"example.avro\",\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"favorite_number\",\"type\":[\"int\",\"null\"]},{\"name\":\"favorite_color\",\"type\":[\"string\",\"null\"]}]}");
final DataStreamSource

Re: 读取ORC文件的VectorizedRowBatch的最佳batchSize设置建议

2020-03-16 Thread Jingsong Li
Hi,

1万行太大了,会占用太大内存。而且batchSize太大也不利于cache。
batchSize不一定要和row group一样,这种row group特别大的情况下,batchSize 够用就行了。

Best,
Jingsong Lee

On Tue, Mar 17, 2020 at 11:52 AM jun su  wrote:

> hi all:
>  在向量化读取orc文件时, 需要配置VectorizedRowBatch的batchSize, 用于设置每次读取的行数,
> 我知道根据orc索引, 读取orc文件最小的单位应该是row group(默认1w行), 底层会根据filter条件来精确到哪些row group,
> 那之前提到的batchSize设置为1000时 , 那一个row group需要读取10次, 每个row group又是按列存储,
> 势必会存在非连续读取的可能, 这样岂不是做不到最大优化? 是够将batchSize设置和row group配置一样才能读取效率最大化呢?
> 不知道我的理解是否正确.
>


-- 
Best, Jingsong Lee


读取ORC文件的VectorizedRowBatch的最佳batchSize设置建议

2020-03-16 Thread jun su
hi all:
 在向量化读取orc文件时, 需要配置VectorizedRowBatch的batchSize, 用于设置每次读取的行数,
我知道根据orc索引, 读取orc文件最小的单位应该是row group(默认1w行), 底层会根据filter条件来精确到哪些row group,
那之前提到的batchSize设置为1000时 , 那一个row group需要读取10次, 每个row group又是按列存储,
势必会存在非连续读取的可能, 这样岂不是做不到最大优化? 是够将batchSize设置和row group配置一样才能读取效率最大化呢?
不知道我的理解是否正确.


Re: JobMaster does not register with ResourceManager in high availability setup

2020-03-16 Thread Xintong Song
Hi Abhinav,

I think you are right. The log confirms that JobMaster has not tried to
connect ResourceManager. Most likely the JobMaster requested for RM address
but has never received it.

I would suggest you to check the ZK logs, see if the request form JM for RM
address has been received and properly responded.

If you can easily reproduce this problem, and you are able to build Flink
from source, you can also try to insert more logs in Flink to further
confirm whether the RM address is received. I don't think that's necessary
though, since those codes have not been changed since Flink 1.7 till the
latest 1.10, and I'm not aware of any reported issue that the JM may not
try to connect RM once the address is received.

Thank you~

Xintong Song



On Tue, Mar 17, 2020 at 7:45 AM Bajaj, Abhinav 
wrote:

> Hi Xintong,
>
>
>
> Apologies for delayed response. I was away for a week.
>
> I am attaching more jobmanager logs.
>
>
>
> To your point on the taskmanagers, the job is deployed with 20 parallelism
> but it has 22 TMs to have 2 of them as spare to assist in quick failover.
>
> I did check the logs and all 22 of task executors from those TMs get
> registered by the time - 2020-02-27 06:35:47.050.
>
>
>
> You would notice that even after this time, the job fails with the error
> “org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Could not allocate all requires slots within timeout of 30 ms. Slots
> required: 201, slots allocated: 0” at 2020-02-27 06:40:36.778.
>
>
>
> Thanks a ton for you help.
>
>
>
> ~ Abhinav Bajaj
>
>
>
> *From: *Xintong Song 
> *Date: *Thursday, March 5, 2020 at 6:30 PM
> *To: *"Bajaj, Abhinav" 
> *Cc: *"user@flink.apache.org" 
> *Subject: *Re: JobMaster does not register with ResourceManager in high
> availability setup
>
>
>
> Hi Abhinav,
>
>
>
> Thanks for the log. However, the attached log seems to be incomplete.
> The NoResourceAvailableException cannot be found in this log.
>
>
>
> Regarding connecting to ResourceManager, the log suggests that:
>
>- ZK was back to life and connected at 06:29:56.
>2020-02-27 06:29:56.539 [main-EventThread] level=INFO
> o.a.f.s.c.o.a.curator.framework.state.ConnectionStateManager  - State
>change: CONNECTED
>- RM registered to ZK and was granted leadership at 06:30:01.
>2020-02-27 06:30:01.677 [flink-akka.actor.default-dispatcher-5]
>level=INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager  -
>ResourceManager akka.tcp://flink@JOBMANAGER:6126/user/resourcemanager
>was granted leadership with fencing token a2c453481ea4e0c7722cab1e4dd741db
>- JM requests RM leader address from ZK at 06:30:06.
>2020-02-27 06:30:06.272 [flink-akka.actor.default-dispatcher-17]
>level=INFO  o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  -
>Starting ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
>- The RM leader address will be notified asynchronously, and only
>after that JM will try to connect to RM (printing the "Connecting to
>ResourceManager" log). The attached log ends in 100ms after JM requesting
>RM leader address, which is too short to tell whether the RM is connected
>properly.
>
> Another finding is about the TM registration. According to the log:
>
>- The parallelism of your job is 20, which means it needs 20 slots to
>be executed.
>- There are only 5 TMs registered. (Searching for "Registering
>TaskManager with ResourceID")
>- Assuming you have the same configurations for JM and TMs (this might
>not always be true), you have one slot per TM.
>599 2020-02-27 06:28:56.495 [main] level=INFO
> org.apache.flink.configuration.GlobalConfiguration  - Loading
>configuration property: taskmanager.numberOfTaskSlots, 1
>- That suggests that it is possible that not all the TaskExecutors are
>recovered/reconnected, leading to the NoResourceAvailableException. We
>would need the rest part of the log (from where the current one ends to
>the NoResourceAvailableException) to tell what happened during the
>scheduling. Also, could you confirm how many TMs do you use?
>
>
>
>
> Thank you~
>
> Xintong Song
>
>
>
>
>
> On Fri, Mar 6, 2020 at 5:55 AM Bajaj, Abhinav 
> wrote:
>
> Hi Xintong,
>
>
>
> Highly appreciate your assistance here.
>
> I am attaching the jobmanager log for reference.
>
>
>
> Let me share my quick responses on what you mentioned.
>
>
>
>
>
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Cannot serve slot
> request, no ResourceManager connected.
>
> *XS: Sometimes you see this log because the ResourceManager is not yet
> connect when the slot request arrives the SlotPool. If the ResourceManager
> is connected later, the SlotPool will still send the pending slot requests,
> in that case you should find logs for SlotPool requesting slots from
> ResourceManager.*
>
>
>
> *AB*: Yes, I have noticed that behavior in scenarios where
> resourcemanager and 

Re: Flink YARN app terminated before the client receives the result

2020-03-16 Thread tison
edit: previously after the cancellation we have a longer call chain to
#jobReachedGloballyTerminalState which does the archive job & JM graceful
showdown, which might take some time so that ...

Best,
tison.


tison  于2020年3月17日周二 上午10:13写道:

> Hi Weike & Till,
>
> I agree with Till and it is also the analysis from my side. However, it
> seems even if we don't have FLINK-15116, it is still possible that we
> complete the cancel future but the cluster got shutdown before it properly
> delivered the response.
>
> There is one thing strange that this behavior almost reproducible, it
> should be a possible order but not always. Maybe previous we have to
> firstly cancel the job which has a long call chain so that it happens we
> have enough time to delivered the response.
>
> But the resolution looks like we introduce some
> synchronization/finalization logics that clear these outstanding future
> with best effort before the cluster(RestServer) down.
>
> Best,
> tison.
>
>
> Till Rohrmann  于2020年3月17日周二 上午4:12写道:
>
>> Hi Weike,
>>
>> could you share the complete logs with us? Attachments are being filtered
>> out by the Apache mail server but it works if you upload the logs somewhere
>> (e.g. https://gist.github.com/) and then share the link with us. Ideally
>> you run the cluster with DEBUG log settings.
>>
>> I assume that you are running Flink 1.10, right?
>>
>> My suspicion is that this behaviour has been introduced with FLINK-15116
>> [1]. It looks as if we complete the shutdown future in
>> MiniDispatcher#cancelJob before we return the response to the
>> RestClusterClient. My guess is that this triggers the shutdown of the
>> RestServer which then is not able to serve the response to the client. I'm
>> pulling in Aljoscha and Tison who introduced this change. They might be
>> able to verify my theory and propose a solution for it.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-15116
>>
>> Cheers,
>> Till
>>
>> On Fri, Mar 13, 2020 at 7:50 AM DONG, Weike 
>> wrote:
>>
>>> Hi Yangze and all,
>>>
>>> I have tried numerous times, and this behavior persists.
>>>
>>> Below is the tail log of taskmanager.log:
>>>
>>> 2020-03-13 12:06:14.240 [flink-akka.actor.default-dispatcher-3] INFO
>>>  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl  - Free slot
>>> TaskSlot(index:0, state:ACTIVE, resource profile:
>>> ResourceProfile{cpuCores=1., taskHeapMemory=1.503gb
>>> (1613968148 bytes), taskOffHeapMemory=0 bytes, managedMemory=1.403gb
>>> (1505922928 bytes), networkMemory=359.040mb (376480732 bytes)},
>>> allocationId: d3acaeac3db62454742e800b5410adfd, jobId:
>>> d0a674795be98bd2574d9ea3286801cb).
>>> 2020-03-13 12:06:14.244 [flink-akka.actor.default-dispatcher-3] INFO
>>>  org.apache.flink.runtime.taskexecutor.JobLeaderService  - Remove job
>>> d0a674795be98bd2574d9ea3286801cb from job leader monitoring.
>>> 2020-03-13 12:06:14.244 [flink-akka.actor.default-dispatcher-3] INFO
>>>  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Close JobManager
>>> connection for job d0a674795be98bd2574d9ea3286801cb.
>>> 2020-03-13 12:06:14.250 [flink-akka.actor.default-dispatcher-3] INFO
>>>  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Close JobManager
>>> connection for job d0a674795be98bd2574d9ea3286801cb.
>>> 2020-03-13 12:06:14.250 [flink-akka.actor.default-dispatcher-3] INFO
>>>  org.apache.flink.runtime.taskexecutor.JobLeaderService  - Cannot reconnect
>>> to job d0a674795be98bd2574d9ea3286801cb because it is not registered.
>>> 2020-03-13 12:06:19.744 [SIGTERM handler] INFO
>>>  org.apache.flink.yarn.YarnTaskExecutorRunner  - RECEIVED SIGNAL 15:
>>> SIGTERM. Shutting down as requested.
>>> 2020-03-13 12:06:19.744 [SIGTERM handler] INFO
>>>  org.apache.flink.yarn.YarnTaskExecutorRunner  - RECEIVED SIGNAL 15:
>>> SIGTERM. Shutting down as requested.
>>> 2020-03-13 12:06:19.745 [PermanentBlobCache shutdown hook] INFO
>>>  org.apache.flink.runtime.blob.PermanentBlobCache  - Shutting down BLOB
>>> cache
>>> 2020-03-13 12:06:19.749 [FileChannelManagerImpl-netty-shuffle shutdown
>>> hook] INFO  org.apache.flink.runtime.io.disk.FileChannelManagerImpl  -
>>> FileChannelManager removed spill file directory
>>> /data/emr/yarn/local/usercache/hadoop/appcache/application_1562207369540_0135/flink-netty-shuffle-65cd4ebb-51f4-48a9-8e3c-43e431bca46d
>>> 2020-03-13 12:06:19.750 [TaskExecutorLocalStateStoresManager shutdown
>>> hook] INFO
>>>  org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager  -
>>> Shutting down TaskExecutorLocalStateStoresManager.
>>> 2020-03-13 12:06:19.750 [TransientBlobCache shutdown hook] INFO
>>>  org.apache.flink.runtime.blob.TransientBlobCache  - Shutting down BLOB
>>> cache
>>> 2020-03-13 12:06:19.751 [FileChannelManagerImpl-io shutdown hook] INFO
>>>  org.apache.flink.runtime.io.disk.FileChannelManagerImpl  -
>>> FileChannelManager removed spill file directory
>>> 

Re: Flink YARN app terminated before the client receives the result

2020-03-16 Thread tison
Hi Weike & Till,

I agree with Till and it is also the analysis from my side. However, it
seems even if we don't have FLINK-15116, it is still possible that we
complete the cancel future but the cluster got shutdown before it properly
delivered the response.

There is one thing strange that this behavior almost reproducible, it
should be a possible order but not always. Maybe previous we have to
firstly cancel the job which has a long call chain so that it happens we
have enough time to delivered the response.

But the resolution looks like we introduce some
synchronization/finalization logics that clear these outstanding future
with best effort before the cluster(RestServer) down.

Best,
tison.


Till Rohrmann  于2020年3月17日周二 上午4:12写道:

> Hi Weike,
>
> could you share the complete logs with us? Attachments are being filtered
> out by the Apache mail server but it works if you upload the logs somewhere
> (e.g. https://gist.github.com/) and then share the link with us. Ideally
> you run the cluster with DEBUG log settings.
>
> I assume that you are running Flink 1.10, right?
>
> My suspicion is that this behaviour has been introduced with FLINK-15116
> [1]. It looks as if we complete the shutdown future in
> MiniDispatcher#cancelJob before we return the response to the
> RestClusterClient. My guess is that this triggers the shutdown of the
> RestServer which then is not able to serve the response to the client. I'm
> pulling in Aljoscha and Tison who introduced this change. They might be
> able to verify my theory and propose a solution for it.
>
> [1] https://issues.apache.org/jira/browse/FLINK-15116
>
> Cheers,
> Till
>
> On Fri, Mar 13, 2020 at 7:50 AM DONG, Weike 
> wrote:
>
>> Hi Yangze and all,
>>
>> I have tried numerous times, and this behavior persists.
>>
>> Below is the tail log of taskmanager.log:
>>
>> 2020-03-13 12:06:14.240 [flink-akka.actor.default-dispatcher-3] INFO
>>  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl  - Free slot
>> TaskSlot(index:0, state:ACTIVE, resource profile:
>> ResourceProfile{cpuCores=1., taskHeapMemory=1.503gb
>> (1613968148 bytes), taskOffHeapMemory=0 bytes, managedMemory=1.403gb
>> (1505922928 bytes), networkMemory=359.040mb (376480732 bytes)},
>> allocationId: d3acaeac3db62454742e800b5410adfd, jobId:
>> d0a674795be98bd2574d9ea3286801cb).
>> 2020-03-13 12:06:14.244 [flink-akka.actor.default-dispatcher-3] INFO
>>  org.apache.flink.runtime.taskexecutor.JobLeaderService  - Remove job
>> d0a674795be98bd2574d9ea3286801cb from job leader monitoring.
>> 2020-03-13 12:06:14.244 [flink-akka.actor.default-dispatcher-3] INFO
>>  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Close JobManager
>> connection for job d0a674795be98bd2574d9ea3286801cb.
>> 2020-03-13 12:06:14.250 [flink-akka.actor.default-dispatcher-3] INFO
>>  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Close JobManager
>> connection for job d0a674795be98bd2574d9ea3286801cb.
>> 2020-03-13 12:06:14.250 [flink-akka.actor.default-dispatcher-3] INFO
>>  org.apache.flink.runtime.taskexecutor.JobLeaderService  - Cannot reconnect
>> to job d0a674795be98bd2574d9ea3286801cb because it is not registered.
>> 2020-03-13 12:06:19.744 [SIGTERM handler] INFO
>>  org.apache.flink.yarn.YarnTaskExecutorRunner  - RECEIVED SIGNAL 15:
>> SIGTERM. Shutting down as requested.
>> 2020-03-13 12:06:19.744 [SIGTERM handler] INFO
>>  org.apache.flink.yarn.YarnTaskExecutorRunner  - RECEIVED SIGNAL 15:
>> SIGTERM. Shutting down as requested.
>> 2020-03-13 12:06:19.745 [PermanentBlobCache shutdown hook] INFO
>>  org.apache.flink.runtime.blob.PermanentBlobCache  - Shutting down BLOB
>> cache
>> 2020-03-13 12:06:19.749 [FileChannelManagerImpl-netty-shuffle shutdown
>> hook] INFO  org.apache.flink.runtime.io.disk.FileChannelManagerImpl  -
>> FileChannelManager removed spill file directory
>> /data/emr/yarn/local/usercache/hadoop/appcache/application_1562207369540_0135/flink-netty-shuffle-65cd4ebb-51f4-48a9-8e3c-43e431bca46d
>> 2020-03-13 12:06:19.750 [TaskExecutorLocalStateStoresManager shutdown
>> hook] INFO
>>  org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager  -
>> Shutting down TaskExecutorLocalStateStoresManager.
>> 2020-03-13 12:06:19.750 [TransientBlobCache shutdown hook] INFO
>>  org.apache.flink.runtime.blob.TransientBlobCache  - Shutting down BLOB
>> cache
>> 2020-03-13 12:06:19.751 [FileChannelManagerImpl-io shutdown hook] INFO
>>  org.apache.flink.runtime.io.disk.FileChannelManagerImpl  -
>> FileChannelManager removed spill file directory
>> /data/emr/yarn/local/usercache/hadoop/appcache/application_1562207369540_0135/flink-io-67ad5c3a-aec6-42be-ab1f-0ce3841fc4bd
>> 2020-03-13 12:06:19.752 [FileCache shutdown hook] INFO
>>  org.apache.flink.runtime.filecache.FileCache  - removed file cache
>> directory
>> /data/emr/yarn/local/usercache/hadoop/appcache/application_1562207369540_0135/flink-dist-cache-65075ee3-e009-4978-a9d8-ec010e6f4b31
>>
>> As the tail log of 

Re: Flink Conf "yarn.flink-dist-jar" Question

2020-03-16 Thread Yang Wang
Hi Hailu,

Sorry for the late response. If the Flink cluster(e.g. Yarn application) is
stopped directly
by `yarn application -kill`, then the staging directory will be left
behind. Since the jobmanager
do not have any change to clean up the staging directly. Also it may happen
when the
jobmanager crashed and reached the attempts limit of Yarn.

For FLINK-13938, yes, it is trying to use the Yarn public cache to
accelerate the container
launch.


Best,
Yang

Hailu, Andreas  于2020年3月10日周二 上午4:38写道:

> Also may I ask what causes these application ID directories to be left
> behind? Is it a job failure, or can they persist even if the application
> succeeds? I’d like to know so that I can implement my own cleanup in the
> interim to prevent exceeding user disk space quotas.
>
>
>
> *// *ah
>
>
>
> *From:* Hailu, Andreas [Engineering]
> *Sent:* Monday, March 9, 2020 1:20 PM
> *To:* 'Yang Wang' 
> *Cc:* tison ; user@flink.apache.org
> *Subject:* RE: Flink Conf "yarn.flink-dist-jar" Question
>
>
>
> Hi Yang,
>
>
>
> Yes, a combination of these two would be very helpful for us. We have a
> single shaded binary which we use to run all of the jobs on our YARN
> cluster. If we could designate a single location in HDFS for that as well,
> we could also greatly benefit from FLINK-13938.
>
>
>
> It sounds like a general public cache solution is what’s being called for?
>
>
>
> *// *ah
>
>
>
> *From:* Yang Wang 
> *Sent:* Sunday, March 8, 2020 10:52 PM
> *To:* Hailu, Andreas [Engineering] 
> *Cc:* tison ; user@flink.apache.org
> *Subject:* Re: Flink Conf "yarn.flink-dist-jar" Question
>
>
>
> Hi Hailu, tison,
>
>
>
> I created a very similar ticket before to accelerate Flink submission on
> Yarn[1]. However,
>
> we do not get a consensus in the PR. Maybe it's time to revive the
> discussion and try
>
> to find a common solution for both the two tickets[1][2].
>
>
>
>
>
> [1]. https://issues.apache.org/jira/browse/FLINK-13938
> 
>
> [2]. https://issues.apache.org/jira/browse/FLINK-14964
> 
>
>
>
>
>
> Best,
>
> Yang
>
>
>
> Hailu, Andreas  于2020年3月7日周六 上午11:21写道:
>
> Hi Tison, thanks for the reply. I’ve replied to the ticket. I’ll be
> watching it as well.
>
>
>
> *// *ah
>
>
>
> *From:* tison 
> *Sent:* Friday, March 6, 2020 1:40 PM
> *To:* Hailu, Andreas [Engineering] 
> *Cc:* user@flink.apache.org
> *Subject:* Re: Flink Conf "yarn.flink-dist-jar" Question
>
>
>
> FLINK-13938 seems a bit different than your requirement. The one totally
> matches is FLINK-14964
> .
> I'll appreciate it if you can share you opinion on the JIRA ticket.
>
>
>
> Best,
>
> tison.
>
>
>
>
>
> tison  于2020年3月7日周六 上午2:35写道:
>
> Yes your requirement is exactly taken into consideration by the community.
> We currently have an open JIRA ticket for the specific feature[1] and works
> for loosing the constraint of flink-jar schema to support DFS location
> should happen.
>
>
>
> Best,
>
> tison.
>
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-13938
> 
>
>
>
>
>
> Hailu, Andreas  于2020年3月7日周六 上午2:03写道:
>
> Hi,
>
>
>
> We noticed that every time an application runs, it uploads the flink-dist
> artifact to the /user//.flink HDFS directory. This causes a user disk
> space quota issue as we submit thousands of apps to our cluster an hour. We
> had a similar problem with our Spark applications where it uploaded the
> Spark Assembly package for every app. Spark provides an argument to use a
> location in HDFS its for applications to leverage so they don’t need to
> upload them for every run, and that was our solution (see “spark.yarn.jar”
> configuration if interested.)
>
>
>
> Looking at the Resource Orchestration Frameworks page
> 

Re: Issues with Watermark generation after join

2020-03-16 Thread Kurt Young
Hi, could you share the SQL you written for your original purpose, not the
one you attached ProcessFunction for debugging?

Best,
Kurt


On Tue, Mar 17, 2020 at 3:08 AM Dominik Wosiński  wrote:

> Actually, I just put this process function there for debugging purposes.
> My main goal is to join the E & C using the Temporal Table function, but I
> have observed exactly the same behavior i.e. when the parallelism was > 1
> there was no output and when I was setting it to 1 then the output was
> generated. So, I have switched to process function to see whether the
> watermarks are reaching this stage.
>
> Best Regards,
> Dom.
>
> pon., 16 mar 2020 o 19:46 Theo Diefenthal <
> theo.diefent...@scoop-software.de> napisał(a):
>
>> Hi Dominik,
>>
>> I had the same once with a custom processfunction. My processfunction
>> buffered the data for a while and then output it again. As the proces
>> function can do anything with the data (transforming, buffering,
>> aggregating...), I think it's just not safe for flink to reason about the
>> watermark of the output.
>>
>> I solved all my issues by calling `assignTimestampsAndWatermarks`
>> directly post to the (co-)process function.
>>
>> Best regards
>> Theo
>>
>> --
>> *Von: *"Dominik Wosiński" 
>> *An: *"user" 
>> *Gesendet: *Montag, 16. März 2020 16:55:18
>> *Betreff: *Issues with Watermark generation after join
>>
>> Hey,
>> I have noticed a weird behavior with a job that I am currently working
>> on. I have 4 different streams from Kafka, lets call them A, B, C and D.
>> Now the idea is that first I do SQL Join of A & B based on some field, then
>> I create append stream from Joined A, let's call it E. Then I need to
>> assign timestamps to E since it is a result of joining and Flink can't
>> figure out the timestamps.
>>
>> Next, I union E & C, to create some F stream. Then finally I connect E &
>> C using `keyBy` and CoProcessFunction. Now the issue I am facing is that if
>> I try to, it works fine if I enforce the parallelism of E to be 1 by
>> invoking *setParallelism*. But if parallelism is higher than 1, for the
>> same data - the watermark is not progressing correctly. I can see that 
>> *CoProcessFunction
>> *methods are invoked and that data is produced, but the Watermark is
>> never progressing for this function. What I can see is that watermark is
>> always equal to (0 - allowedOutOfOrderness). I can see that timestamps are
>> correctly extracted and when I add debug prints I can actually see that
>> Watermarks are generated for all streams, but for some reason, if the
>> parallelism is > 1 they will never progress up to connect function. Is
>> there anything that needs to be done after SQL joins that I don't know of
>> ??
>>
>> Best Regards,
>> Dom.
>>
>


Re: Very large _metadata file

2020-03-16 Thread Jacob Sevart
Thanks! That would do it. I've disabled the operator for now.

The purpose was to know the age of the job's state, so that we could
consider its output in terms of how much context it knows. Regular state
seemed insufficient because partitions might see their first traffic at
different times.

How would you go about implementing something like that?

On Mon, Mar 16, 2020 at 1:54 PM Till Rohrmann  wrote:

> Hi Jacob,
>
> I think you are running into some deficiencies of Flink's union state
> here. The problem is that for every entry in your list state, Flink stores
> a separate offset (a long value). The reason for this behaviour is that we
> use the same state implementation for the union state as well as for the
> split state. For the latter, the offset information is required to split
> the state in case of changing the parallelism of your job.
>
> My recommendation would be to try to get rid of union state all together.
> The union state has primarily been introduced to checkpoint some source
> implementations and might become deprecated due to performance problems
> once these sources can be checkpointed differently.
>
> Cheers,
> Till
>
> On Sat, Mar 14, 2020 at 3:23 AM Jacob Sevart  wrote:
>
>> Oh, I should clarify that's 43MB per partition, so with 48 partitions it
>> explains my 2GB.
>>
>> On Fri, Mar 13, 2020 at 7:21 PM Jacob Sevart  wrote:
>>
>>> Running *Checkpoints.loadCheckpointMetadata *under a debugger, I found
>>> something:
>>> *subtaskState.managedOperatorState[0].sateNameToPartitionOffsets("startup-times").offsets.value
>>>  *weights
>>> 43MB (5.3 million longs).
>>>
>>> "startup-times" is an operator state of mine (union list of
>>> java.time.Instant). I see a way to end up fewer items in the list, but I'm
>>> not sure how the actual size is related to the number of offsets. Can you
>>> elaborate on that?
>>>
>>> Incidentally, 42.5MB is the number I got out of
>>> https://issues.apache.org/jira/browse/FLINK-14618
>>> .
>>> So I think my two problems are closely related.
>>>
>>> Jacob
>>>
>>> On Mon, Mar 9, 2020 at 6:36 AM Congxian Qiu 
>>> wrote:
>>>
 Hi

 As Gordon said, the metadata will contain the ByteStreamStateHandle,
 when writing out the ByteStreamStateHandle, will write out the handle name
 -- which is a path(as you saw). The ByteStreamStateHandle will be created
 when state size is small than `state.backend.fs.memory-threshold`(default
 is 1024).

 If you want to verify this, you can ref the unit test
 `CheckpointMetadataLoadingTest#testLoadAndValidateSavepoint` and load the
 metadata, you can find out that there are many `ByteStreamStateHandle`, and
 their names are the strings you saw in the metadata.

 Best,
 Congxian


 Jacob Sevart  于2020年3月6日周五 上午3:57写道:

> Thanks, I will monitor that thread.
>
> I'm having a hard time following the serialization code, but if you
> know anything about the layout, tell me if this makes sense. What I see in
> the hex editor is, first, many HDFS paths. Then gigabytes of unreadable
> data. Then finally another HDFS path at the end.
>
> If it is putting state in there, under normal circumstances, does it
> make sense that it would be interleaved with metadata? I would expect all
> the metadata to come first, and then state.
>
> Jacob
>
>
>
> Jacob
>
> On Thu, Mar 5, 2020 at 10:53 AM Kostas Kloudas 
> wrote:
>
>> Hi Jacob,
>>
>> As I said previously I am not 100% sure what can be causing this
>> behavior, but this is a related thread here:
>>
>> https://urldefense.proofpoint.com/v2/url?u=https-3A__lists.apache.org_thread.html_r3bfa2a3368a9c7850cba778e4decfe4f6dba9607f32addb69814f43d-2540-253Cuser.flink.apache.org-253E=DwIBaQ=r2dcLCtU9q6n0vrtnDw9vg=lTq5mEceM-U-tVfWzKBngg=awEv6FqKY6dZ8NIA4KEFc_qQ6aadR_jTAWnO17wtAus=P3Xd0IFKJTDIG2MMeP-hOSfY4ohoCEUMQEJhvGecSlI=
>>
>> Which you can re-post your problem and monitor for answers.
>>
>> Cheers,
>> Kostas
>>
>> On Wed, Mar 4, 2020 at 7:02 PM Jacob Sevart  wrote:
>> >
>> > Kostas and Gordon,
>> >
>> > Thanks for the suggestions! I'm on RocksDB. We don't have that
>> setting configured so it should be at the default 1024b. This is the full
>> "state.*" section showing in the JobManager UI.
>> >
>> >
>> >
>> > Jacob
>> >
>> > On Wed, Mar 4, 2020 at 2:45 AM Tzu-Li (Gordon) Tai <
>> tzuli...@apache.org> wrote:
>> >>
>> >> Hi Jacob,
>> >>
>> >> Apart from what Klou already mentioned, one slightly possible
>> reason:
>> >>
>> >> If you are using the FsStateBackend, it is also possible that 

Re: Flink on Kubernetes Vs Flink Natively on Kubernetes

2020-03-16 Thread Yang Wang
Hi Pankaj,

Just like Xintong has said, the biggest difference of Flink on Kubernetes
and native
integration is dynamic resource allocation. Since the latter has en
embedded K8s
client and will communicate with K8s Api server directly to
allocate/release JM/TM
pods.

Both for the two ways to run Flink on K8s, you do not need to reserve the
whole
cluster for Flink. Flink could run with other workloads(e.g. Spark,
tensorflow, etc.).
The K8s cluster could guarantee the isolation.


Best,
Yang

Pankaj Chand  于2020年3月16日周一 下午5:51写道:

> Hi Xintong,
>
> Thank you for the explanation!
>
> If I run Flink "natively" on Kubernetes, will I also be able to run Spark
> on the same Kubernetes cluster, or will it make the Kubernetes cluster be
> reserved for Flink only?
>
> Thank you!
>
> Pankaj
>
> On Mon, Mar 16, 2020 at 5:41 AM Xintong Song 
> wrote:
>
>> Forgot to mention that "running Flink natively on Kubernetes" is newly
>> introduced and is only available for Flink 1.10 and above.
>>
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Mon, Mar 16, 2020 at 5:40 PM Xintong Song 
>> wrote:
>>
>>> Hi Pankaj,
>>>
>>> "Running Flink on Kubernetes" refers to the old way that basically
>>> deploys a Flink standalone cluster on Kubernetes. We leverage scripts to
>>> run Flink Master and TaskManager processes inside Kubernetes container. In
>>> this way, Flink is not ware of whether it's running in containers or
>>> directly on physical machines, and will not interact with the Kubernetes
>>> Master. Flink Master reactively accept all registered TaskManagers, whose
>>> number is decided by the Kubernetes replica.
>>>
>>> "Running Flink natively on Kubernetes" refers deploy Flink as a
>>> Kubernetes Job. Flink Master will interact with Kubernetes Master, and
>>> actively requests for pods/containers, like on Yarn/Mesos.
>>>
>>> Thank you~
>>>
>>> Xintong Song
>>>
>>>
>>>
>>> On Mon, Mar 16, 2020 at 4:03 PM Pankaj Chand 
>>> wrote:
>>>
 Hi all,

 I want to run Flink, Spark and other processing engines on a single
 Kubernetes cluster.

 From the Flink documentation, I did not understand the difference
 between:
 (1) Running Flink on Kubernetes, Versus (2) Running Flink natively on
 Kubernetes.

 Could someone please explain the difference between the two, and when
 would you use which option?

 Thank you,

 Pankaj

>>>


Re: datadog metrics

2020-03-16 Thread Fanbin Bu
Hi Steve,

could you please share your work around solution in more detail in the
above ticket?

Thanks,
Fanbin

On Mon, Mar 16, 2020 at 2:50 AM Chesnay Schepler  wrote:

> I've created https://issues.apache.org/jira/browse/FLINK-16611.
>
> @Steva Any chance you could contribute your changes, or some insight on
> what would need to be changed?
>
> On 11/03/2020 23:16, Steve Whelan wrote:
>
> Hi Fabian,
>
> We ran into the same issue. We modified the reporter to emit the metrics
> in chunks and it worked fine after. Would be interested in seeing a ticket
> on this as well.
>
> - Steve
>
> On Wed, Mar 11, 2020 at 5:13 AM Chesnay Schepler 
> wrote:
>
>> Please open a JIRA; we may have to split the datatog report into several
>> chunks.
>>
>> On 09/03/2020 07:47, Fanbin Bu wrote:
>>
>> quote from the following link:
>>
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/SQL-Query-named-operator-exceeds-80-characters-td24807.html#a24818
>>
>> "This is a safeguard in the metric system to prevent extremely long names
>> (as these could cause the reporting to fail); so long as the prefix is
>> unique you can safely ignore this warning."
>>
>> I do see from log that my sql operator name is too long and says it's
>> truncated.
>> But i still failed to report to datadog.
>>
>> Thanks
>> Fanbin
>>
>> On Sun, Mar 8, 2020 at 11:36 PM Fanbin Bu  wrote:
>>
>>> Hi,
>>>
>>> Has anybody seen this error before and what is the suggested way to
>>> solve it?
>>>
>>> 2020-03-07 02:54:34,100 WARN
>>>  org.apache.flink.metrics.datadog.DatadogHttpClient- Failed to
>>> send request to Datadog (response was Response{protocol=http/1.1, code=413,
>>> message=Request Entity Too Large, url=
>>> https://app.datadoghq.com/api/v1/series?api_key=
>>>
>>> thanks,
>>> Fanbin
>>>
>>
>>
>


Re: Very large _metadata file

2020-03-16 Thread Till Rohrmann
Hi Jacob,

I think you are running into some deficiencies of Flink's union state here.
The problem is that for every entry in your list state, Flink stores a
separate offset (a long value). The reason for this behaviour is that we
use the same state implementation for the union state as well as for the
split state. For the latter, the offset information is required to split
the state in case of changing the parallelism of your job.

My recommendation would be to try to get rid of union state all together.
The union state has primarily been introduced to checkpoint some source
implementations and might become deprecated due to performance problems
once these sources can be checkpointed differently.

Cheers,
Till

On Sat, Mar 14, 2020 at 3:23 AM Jacob Sevart  wrote:

> Oh, I should clarify that's 43MB per partition, so with 48 partitions it
> explains my 2GB.
>
> On Fri, Mar 13, 2020 at 7:21 PM Jacob Sevart  wrote:
>
>> Running *Checkpoints.loadCheckpointMetadata *under a debugger, I found
>> something:
>> *subtaskState.managedOperatorState[0].sateNameToPartitionOffsets("startup-times").offsets.value
>>  *weights
>> 43MB (5.3 million longs).
>>
>> "startup-times" is an operator state of mine (union list of
>> java.time.Instant). I see a way to end up fewer items in the list, but I'm
>> not sure how the actual size is related to the number of offsets. Can you
>> elaborate on that?
>>
>> Incidentally, 42.5MB is the number I got out of
>> https://issues.apache.org/jira/browse/FLINK-14618. So I think my two
>> problems are closely related.
>>
>> Jacob
>>
>> On Mon, Mar 9, 2020 at 6:36 AM Congxian Qiu 
>> wrote:
>>
>>> Hi
>>>
>>> As Gordon said, the metadata will contain the ByteStreamStateHandle,
>>> when writing out the ByteStreamStateHandle, will write out the handle name
>>> -- which is a path(as you saw). The ByteStreamStateHandle will be created
>>> when state size is small than `state.backend.fs.memory-threshold`(default
>>> is 1024).
>>>
>>> If you want to verify this, you can ref the unit test
>>> `CheckpointMetadataLoadingTest#testLoadAndValidateSavepoint` and load the
>>> metadata, you can find out that there are many `ByteStreamStateHandle`, and
>>> their names are the strings you saw in the metadata.
>>>
>>> Best,
>>> Congxian
>>>
>>>
>>> Jacob Sevart  于2020年3月6日周五 上午3:57写道:
>>>
 Thanks, I will monitor that thread.

 I'm having a hard time following the serialization code, but if you
 know anything about the layout, tell me if this makes sense. What I see in
 the hex editor is, first, many HDFS paths. Then gigabytes of unreadable
 data. Then finally another HDFS path at the end.

 If it is putting state in there, under normal circumstances, does it
 make sense that it would be interleaved with metadata? I would expect all
 the metadata to come first, and then state.

 Jacob



 Jacob

 On Thu, Mar 5, 2020 at 10:53 AM Kostas Kloudas 
 wrote:

> Hi Jacob,
>
> As I said previously I am not 100% sure what can be causing this
> behavior, but this is a related thread here:
>
> https://urldefense.proofpoint.com/v2/url?u=https-3A__lists.apache.org_thread.html_r3bfa2a3368a9c7850cba778e4decfe4f6dba9607f32addb69814f43d-2540-253Cuser.flink.apache.org-253E=DwIBaQ=r2dcLCtU9q6n0vrtnDw9vg=lTq5mEceM-U-tVfWzKBngg=awEv6FqKY6dZ8NIA4KEFc_qQ6aadR_jTAWnO17wtAus=P3Xd0IFKJTDIG2MMeP-hOSfY4ohoCEUMQEJhvGecSlI=
>
> Which you can re-post your problem and monitor for answers.
>
> Cheers,
> Kostas
>
> On Wed, Mar 4, 2020 at 7:02 PM Jacob Sevart  wrote:
> >
> > Kostas and Gordon,
> >
> > Thanks for the suggestions! I'm on RocksDB. We don't have that
> setting configured so it should be at the default 1024b. This is the full
> "state.*" section showing in the JobManager UI.
> >
> >
> >
> > Jacob
> >
> > On Wed, Mar 4, 2020 at 2:45 AM Tzu-Li (Gordon) Tai <
> tzuli...@apache.org> wrote:
> >>
> >> Hi Jacob,
> >>
> >> Apart from what Klou already mentioned, one slightly possible
> reason:
> >>
> >> If you are using the FsStateBackend, it is also possible that your
> state is small enough to be considered to be stored inline within the
> metadata file.
> >> That is governed by the "state.backend.fs.memory-threshold"
> configuration, with a default value of 1024 bytes, or can also be
> configured with the `fileStateSizeThreshold` argument when constructing 
> the
> `FsStateBackend`.
> >> The purpose of that threshold is to ensure that the backend does
> not create a large amount of very small files, where potentially the file
> pointers are actually larger than the state itself.
> >>
> >> Cheers,
> >> Gordon
> >>
> >>
> >>
> >> On Wed, Mar 4, 2020 at 6:17 PM Kostas Kloudas 
> wrote:
> >>>
> >>> Hi Jacob,
> >>>
> >>> Could you specify which 

Re: Cancel the flink task and restore from checkpoint ,can I change the flink operator's parallelism

2020-03-16 Thread Till Rohrmann
If you want to change the max parallelism then you need to take a savepoint
and use Flink's state processor API [1] to rewrite the max parallelism by
creating a new savepoint from the old one.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html

Cheers,
Till

On Sat, Mar 14, 2020 at 4:07 AM LakeShen  wrote:

> Hi Eleanore , if you resume from savepoint , you can't change the flink
> operator's max parallelism .
>
> Eleanore Jin  于2020年3月14日周六 上午12:51写道:
>
> > Hi Piotr,
> > Does this also apply to savepoint? (meaning the max parallelism should
> not
> > change for job resume from savepoint?)
> >
> > Thanks a lot!
> > Eleanore
> >
> > On Fri, Mar 13, 2020 at 6:33 AM Piotr Nowojski 
> > wrote:
> >
> > > Hi,
> > >
> > > Yes, you can change the parallelism. One thing that you can not change
> is
> > > “max parallelism”.
> > >
> > > Piotrek
> > >
> > > > On 13 Mar 2020, at 04:34, Sivaprasanna 
> > > wrote:
> > > >
> > > > I think you can modify the operator’s parallelism. It is only if you
> > > have set maxParallelism, and while restoring from a checkpoint, you
> > > shouldn’t modify the maxParallelism. Otherwise, I believe the state
> will
> > be
> > > lost.
> > > >
> > > > -
> > > > Sivaprasanna
> > > >
> > > > On Fri, 13 Mar 2020 at 9:01 AM, LakeShen  > > > wrote:
> > > > Hi community,
> > > >   I have a question is that I cancel the flink task and retain
> the
> > > checkpoint dir, then restore from the checkpoint dir ,can I change the
> > > flink operator's parallelism,in my thoughts, I think I can't change the
> > > flink operator's parallelism,but I am not sure.
> > > >  Thanks to your reply.
> > > >
> > > > Best wishes,
> > > > LakeShen
> > >
> > >
> >
>


Re: Flink YARN app terminated before the client receives the result

2020-03-16 Thread Till Rohrmann
Hi Weike,

could you share the complete logs with us? Attachments are being filtered
out by the Apache mail server but it works if you upload the logs somewhere
(e.g. https://gist.github.com/) and then share the link with us. Ideally
you run the cluster with DEBUG log settings.

I assume that you are running Flink 1.10, right?

My suspicion is that this behaviour has been introduced with FLINK-15116
[1]. It looks as if we complete the shutdown future in
MiniDispatcher#cancelJob before we return the response to the
RestClusterClient. My guess is that this triggers the shutdown of the
RestServer which then is not able to serve the response to the client. I'm
pulling in Aljoscha and Tison who introduced this change. They might be
able to verify my theory and propose a solution for it.

[1] https://issues.apache.org/jira/browse/FLINK-15116

Cheers,
Till

On Fri, Mar 13, 2020 at 7:50 AM DONG, Weike  wrote:

> Hi Yangze and all,
>
> I have tried numerous times, and this behavior persists.
>
> Below is the tail log of taskmanager.log:
>
> 2020-03-13 12:06:14.240 [flink-akka.actor.default-dispatcher-3] INFO
>  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl  - Free slot
> TaskSlot(index:0, state:ACTIVE, resource profile:
> ResourceProfile{cpuCores=1., taskHeapMemory=1.503gb
> (1613968148 bytes), taskOffHeapMemory=0 bytes, managedMemory=1.403gb
> (1505922928 bytes), networkMemory=359.040mb (376480732 bytes)},
> allocationId: d3acaeac3db62454742e800b5410adfd, jobId:
> d0a674795be98bd2574d9ea3286801cb).
> 2020-03-13 12:06:14.244 [flink-akka.actor.default-dispatcher-3] INFO
>  org.apache.flink.runtime.taskexecutor.JobLeaderService  - Remove job
> d0a674795be98bd2574d9ea3286801cb from job leader monitoring.
> 2020-03-13 12:06:14.244 [flink-akka.actor.default-dispatcher-3] INFO
>  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Close JobManager
> connection for job d0a674795be98bd2574d9ea3286801cb.
> 2020-03-13 12:06:14.250 [flink-akka.actor.default-dispatcher-3] INFO
>  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Close JobManager
> connection for job d0a674795be98bd2574d9ea3286801cb.
> 2020-03-13 12:06:14.250 [flink-akka.actor.default-dispatcher-3] INFO
>  org.apache.flink.runtime.taskexecutor.JobLeaderService  - Cannot reconnect
> to job d0a674795be98bd2574d9ea3286801cb because it is not registered.
> 2020-03-13 12:06:19.744 [SIGTERM handler] INFO
>  org.apache.flink.yarn.YarnTaskExecutorRunner  - RECEIVED SIGNAL 15:
> SIGTERM. Shutting down as requested.
> 2020-03-13 12:06:19.744 [SIGTERM handler] INFO
>  org.apache.flink.yarn.YarnTaskExecutorRunner  - RECEIVED SIGNAL 15:
> SIGTERM. Shutting down as requested.
> 2020-03-13 12:06:19.745 [PermanentBlobCache shutdown hook] INFO
>  org.apache.flink.runtime.blob.PermanentBlobCache  - Shutting down BLOB
> cache
> 2020-03-13 12:06:19.749 [FileChannelManagerImpl-netty-shuffle shutdown
> hook] INFO  org.apache.flink.runtime.io.disk.FileChannelManagerImpl  -
> FileChannelManager removed spill file directory
> /data/emr/yarn/local/usercache/hadoop/appcache/application_1562207369540_0135/flink-netty-shuffle-65cd4ebb-51f4-48a9-8e3c-43e431bca46d
> 2020-03-13 12:06:19.750 [TaskExecutorLocalStateStoresManager shutdown
> hook] INFO
>  org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager  -
> Shutting down TaskExecutorLocalStateStoresManager.
> 2020-03-13 12:06:19.750 [TransientBlobCache shutdown hook] INFO
>  org.apache.flink.runtime.blob.TransientBlobCache  - Shutting down BLOB
> cache
> 2020-03-13 12:06:19.751 [FileChannelManagerImpl-io shutdown hook] INFO
>  org.apache.flink.runtime.io.disk.FileChannelManagerImpl  -
> FileChannelManager removed spill file directory
> /data/emr/yarn/local/usercache/hadoop/appcache/application_1562207369540_0135/flink-io-67ad5c3a-aec6-42be-ab1f-0ce3841fc4bd
> 2020-03-13 12:06:19.752 [FileCache shutdown hook] INFO
>  org.apache.flink.runtime.filecache.FileCache  - removed file cache
> directory
> /data/emr/yarn/local/usercache/hadoop/appcache/application_1562207369540_0135/flink-dist-cache-65075ee3-e009-4978-a9d8-ec010e6f4b31
>
> As the tail log of jobmanager.log is kind of lengthy, I have attached it
> in this mail.
>
> From what I have seen, the TaskManager and JobManager shut down by
> themselves, however, I have noticed some Netty exceptions (from the stack
> trace, it is part of the REST handler) like:
>
> ERROR
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.rejectedExecution
>  - Failed to submit a listener notification task. Event loop shut down?
> java.util.concurrent.RejectedExecutionException: event executor terminated
>
> Thus I suppose that these exceptions might be the actual cause of
> premature termination of the REST server, and I am still looking into the
> real cause of this.
>
> Best,
> Weike
>
> On Fri, Mar 13, 2020 at 1:45 PM Yangze Guo  wrote:
>
>> Would you mind to share more information about why the task executor
>> is killed? If it is 

Re: how to specify yarnqueue when starting a new job programmatically?

2020-03-16 Thread Till Rohrmann
Hi Vitaliy,

in the case of a session cluster you cannot influence the queue
programmatically since Flink uses the value configured
via yarn.application.queue which is read from the flink-conf.yaml.

However, there is a way to influence the yarn queue programmatically if you
use the per job mode. What you need to do is to create a
StreamExecutionEnvironment manually by passing a Configuration instance. In
this configuration instance you can set the respective configuration key to
the desired yarn queue value. Please be aware that with this approach, you
won't load Flink's global configuration specified in flink-conf.yaml.

Cheers,
Till

On Thu, Mar 12, 2020 at 12:02 PM Xintong Song  wrote:

> Do you mean in the job java code, where you call “env.execute()”?
>
> I don't think so. The yarn queue is a Flink cluster level configuration,
> and I don't think you can change the cluster level configurations in your
> job code. You job might even be created after the cluster is started, e.g.,
> in a Flink YARN Session.[1]
>
> Thank you~
>
> Xintong Song
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/yarn_setup.html#flink-yarn-session
>
> On Thu, Mar 12, 2020 at 6:20 PM Vitaliy Semochkin 
> wrote:
>
>> Thank you Xintong Song,
>>
>> is there any way to queue programmatically, i.e. via java code?
>>
>> Regards,
>> Vitaliy
>>
>> On Thu, Mar 12, 2020 at 5:56 AM Xintong Song 
>> wrote:
>>
>>> Hi Vitaliy,
>>>
>>> You can specify a yarn queue by either setting the configuration option
>>> 'yarn.application.queue' [1], or using the command line option '-qu' (or
>>> '--queue') [2].
>>>
>>> Thank you~
>>>
>>> Xintong Song
>>>
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#yarn-application-queue
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/yarn_setup.html#start-a-session
>>>
>>> On Thu, Mar 12, 2020 at 3:56 AM Vitaliy Semochkin 
>>> wrote:
>>>
 Hi,

 How can I specify a yarn queue when I start a new job programmatically?

 Regards,
 Vitaliy

>>>


Re: Flink gelly dependency in transient EMR cluster

2020-03-16 Thread Till Rohrmann
Alternatively, you could also bundle the Gelly dependency with your user
code jar by creating an uber jar. The downside of this approach would be an
increased jar size which needs to be uploaded to the cluster.

Cheers,
Till

On Thu, Mar 12, 2020 at 4:13 PM Antonio Martínez Carratalá <
amarti...@alto-analytics.com> wrote:

> I reply to myself with the solution in case someone else is having the
> same question
>
> It is only needed to add a copy command to copy the jar from flink/opt to
> flink/lib, in my case:
>
> StepConfig addGellyStep = new StepConfig()
> .withName("add-gelly-step")
> .withHadoopJarStep(new HadoopJarStepConfig("command-runner.jar")
> .withArgs("bash", "-c", "sudo cp
> /usr/lib/flink/opt/flink-gelly_2.11-1.8.0.jar /usr/lib/flink/lib"));
>
>
>
> On Thu, Mar 12, 2020 at 9:43 AM Antonio Martínez Carratalá <
> amarti...@alto-analytics.com> wrote:
>
>> Hello,
>>
>> I'm trying to run a flink job that works with graphs in a transient
>> cluster in EMR, here is my code:
>>
>> --
>> HadoopJarStepConfig copyJarStepConf = new HadoopJarStepConfig()
>> .withJar("command-runner.jar")
>> .withArgs("bash", "-c", "aws s3 cp s3://" + BUCKET_NAME +
>> "/pugore-flink.jar /home/hadoop/pugore-flink.jar");
>>
>> StepConfig copyJarStep = new StepConfig()
>> .withName("Copy Jar")
>> .withHadoopJarStep(copyJarStepConf);
>>
>> HadoopJarStepConfig flinkJobConf = new HadoopJarStepConfig()
>> .withJar("command-runner.jar")
>> .withArgs("bash", "-c", "flink run -m yarn-cluster -yn 1"
>> + " --class
>> es.pugore.flink.job.centrality.CentralityJob /home/hadoop/pugore-flink.jar"
>> + " --alpha 0.05"
>> + " --iterations 50"
>> + " --input s3://" + BUCKET_NAME + "/" + key +
>> "/edges.csv"
>> + " --output s3://" + BUCKET_NAME + "/" + key +
>> "/vertices-centrality.csv");
>>
>> StepConfig flinkRunJobStep = new StepConfig()
>> .withName("Flink job")
>> .withActionOnFailure("CONTINUE")
>> .withHadoopJarStep(flinkJobConf);
>>
>> List stepConfigs = new ArrayList<>();
>> stepConfigs.add(copyJarStep);
>> stepConfigs.add(flinkRunJobStep);
>>
>> Application flink = new Application().withName("Flink");
>>
>> String clusterName = "flink-job-" + key;
>> RunJobFlowRequest request = new RunJobFlowRequest()
>> .withName(clusterName)
>> .withReleaseLabel("emr-5.26.0")
>> .withApplications(flink)
>> .withServiceRole("EMR_DefaultRole")
>> .withJobFlowRole("EMR_EC2_DefaultRole")
>> .withLogUri("s3://" + BUCKET_NAME + "/" + key + "/logs")
>> .withInstances(new JobFlowInstancesConfig()
>> .withInstanceCount(2)
>> .withKeepJobFlowAliveWhenNoSteps(false)
>> .withMasterInstanceType("m4.large")
>> .withSlaveInstanceType("m4.large"))
>> .withSteps(stepConfigs);
>>
>> RunJobFlowResult result = getEmrClient().runJobFlow(request);
>> String clusterId = result.getJobFlowId();
>>
>> log.debug("[" + key + "] cluster created with id: " + clusterId);
>> -
>>
>> This job creates the cluster from scratch and launches my job, it is
>> executed but I'm getting the following error:
>>
>> Caused by: java.lang.NoClassDefFoundError:
>> org/apache/flink/graph/GraphAlgorithm
>>
>> In my local cluster I copy the flink-gelly jar from flink/opt to
>> flink/lib and it works, is there any way to do it automatically in a
>> transient EMR cluster before launching the job?
>>
>> I know I can put the jar in S3 and copy it from there as I do with my jar
>> in the first step and then use it as classpath, but I'm wondering if it is
>> possible to instruct EMR to include that dependency in some way, maybe with
>> some option in Application, Configuration,  BootstrapAction or any other...
>> since it is a Flink dependency
>>
>> Thank you
>>
>>
>>
>>
>>
>


Re: Issues with Watermark generation after join

2020-03-16 Thread Dominik Wosiński
Actually, I just put this process function there for debugging purposes. My
main goal is to join the E & C using the Temporal Table function, but I
have observed exactly the same behavior i.e. when the parallelism was > 1
there was no output and when I was setting it to 1 then the output was
generated. So, I have switched to process function to see whether the
watermarks are reaching this stage.

Best Regards,
Dom.

pon., 16 mar 2020 o 19:46 Theo Diefenthal 
napisał(a):

> Hi Dominik,
>
> I had the same once with a custom processfunction. My processfunction
> buffered the data for a while and then output it again. As the proces
> function can do anything with the data (transforming, buffering,
> aggregating...), I think it's just not safe for flink to reason about the
> watermark of the output.
>
> I solved all my issues by calling `assignTimestampsAndWatermarks` directly
> post to the (co-)process function.
>
> Best regards
> Theo
>
> --
> *Von: *"Dominik Wosiński" 
> *An: *"user" 
> *Gesendet: *Montag, 16. März 2020 16:55:18
> *Betreff: *Issues with Watermark generation after join
>
> Hey,
> I have noticed a weird behavior with a job that I am currently working on.
> I have 4 different streams from Kafka, lets call them A, B, C and D. Now
> the idea is that first I do SQL Join of A & B based on some field, then I
> create append stream from Joined A, let's call it E. Then I need to
> assign timestamps to E since it is a result of joining and Flink can't
> figure out the timestamps.
>
> Next, I union E & C, to create some F stream. Then finally I connect E & C
> using `keyBy` and CoProcessFunction. Now the issue I am facing is that if I
> try to, it works fine if I enforce the parallelism of E to be 1 by invoking
> *setParallelism*. But if parallelism is higher than 1, for the same data
> - the watermark is not progressing correctly. I can see that 
> *CoProcessFunction
> *methods are invoked and that data is produced, but the Watermark is
> never progressing for this function. What I can see is that watermark is
> always equal to (0 - allowedOutOfOrderness). I can see that timestamps are
> correctly extracted and when I add debug prints I can actually see that
> Watermarks are generated for all streams, but for some reason, if the
> parallelism is > 1 they will never progress up to connect function. Is
> there anything that needs to be done after SQL joins that I don't know of
> ??
>
> Best Regards,
> Dom.
>


Re: Issues with Watermark generation after join

2020-03-16 Thread Theo Diefenthal
Hi Dominik, 

I had the same once with a custom processfunction. My processfunction buffered 
the data for a while and then output it again. As the proces function can do 
anything with the data (transforming, buffering, aggregating...), I think it's 
just not safe for flink to reason about the watermark of the output. 

I solved all my issues by calling `assignTimestampsAndWatermarks` directly post 
to the (co-)process function. 

Best regards 
Theo 


Von: "Dominik Wosiński"  
An: "user"  
Gesendet: Montag, 16. März 2020 16:55:18 
Betreff: Issues with Watermark generation after join 

Hey, 
I have noticed a weird behavior with a job that I am currently working on. I 
have 4 different streams from Kafka, lets call them A, B, C and D. Now the idea 
is that first I do SQL Join of A & B based on some field, then I create append 
stream from Joined A, let's call it E. Then I need to assign timestamps to E 
since it is a result of joining and Flink can't figure out the timestamps. 

Next, I union E & C, to create some F stream. Then finally I connect E & C 
using `keyBy` and CoProcessFunction. Now the issue I am facing is that if I try 
to, it works fine if I enforce the parallelism of E to be 1 by invoking 
setParallelism . But if parallelism is higher than 1, for the same data - the 
watermark is not progressing correctly. I can see that CoProcessFunction 
methods are invoked and that data is produced, but the Watermark is never 
progressing for this function. What I can see is that watermark is always equal 
to (0 - allowedOutOfOrderness). I can see that timestamps are correctly 
extracted and when I add debug prints I can actually see that Watermarks are 
generated for all streams, but for some reason, if the parallelism is > 1 they 
will never progress up to connect function. Is there anything that needs to be 
done after SQL joins that I don't know of ?? 

Best Regards, 
Dom. 


Fwd: AfterMatchSkipStrategy for timed out patterns

2020-03-16 Thread Dominik Wosiński
Hey all,

I was wondering whether for CEP the *AfterMatchSkipStrategy *is applied
during matching or if simply the results are removed after the match. The
question is the result of the experiments I was doing with CEP. Say I have
the readings from some sensor and I want to detect events over some
threshold. So I have something like below:

Pattern.begin[AccelVector]("beginning",
AfterMatchSkipStrategy.skipPastLastEvent())
  .where(_.data() < Threshold)
  .optional
  .followedBy(EventPatternName)
  .where(event => event.data() >= Threshold)
  .oneOrMore
  .greedy
  .consecutive()
  .followedBy("end")
  .where(_.data() < Threshold)
  .oneOrMore
  .within(Time.minutes(1))


The thing is that sometimes sensors may stop sending data or the data is
lost so I would like to emit events that have technically timed out. I have
created a PatternProcessFunction that simply gets events that have timed
out and check for *EventPatternName* part.

It works fine, but I have noticed weird behavior that the events that get
passed to the *processTimedOutMatch *are repeated as if there was no
*AfterMatchSkipStrategy.*

So, for example say the Threshold=200, and I have the following events for
one of the sensors:
Event1 (timestamp= 1, data = 10)
Event2 (timestamp= 2, data = 250)
Event3 (timestamp= 3, data = 300)
Event4 (timestamp= 4, data = 350)
Event5 (timestamp= 5, data = 400)
Event6 (timestamp= 6, data = 450)

After that, this sensor stops sending data but others are sending data so
the watermark is progressing - this obviously causes timeout of the
pattern. And the issue I have is the fact that  *processTimedOutMatch* gets
called multiple times, first for the whole pattern Event1 to Event6 and
each call just skips one event so next, I have Event2 to Event 6, Event3 to
Event6 up to just Event6.

My understanding is that *AfterMatchSkipStrategy *should wipe out those
partial matches or does it work differently for timed out matches?

Thanks in advance,
Best Regards,
Dom.


How do I get the outPoolUsage value inside my own stream operator?

2020-03-16 Thread Felipe Gutierrez
Hi community,

I have built my own operator (not a UDF) and I want to collect the
metrics of "outPoolUsage" inside it. How do I do it assuming that I
have to do some modifications in the source code?

I know that the Gouge comes from
flink-runtime/org.apache.flink.runtime.io.network.metrics.OutputBufferPoolUsageGauge.java.
Inside of my operator MyAbstractUdfStreamOperator I can get the
"MetricGroup metricGroup = this.getMetricGroup()".
Then I implemented the "Gauge gauge = (Gauge)
metricGroup.getMetric("outPoolUsage");" but it returns null all the
time. Even when I click on the Backpressure UI Interface.

Thanks,
Felipe
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com


Issues with Watermark generation after join

2020-03-16 Thread Dominik Wosiński
Hey,
I have noticed a weird behavior with a job that I am currently working on.
I have 4 different streams from Kafka, lets call them A, B, C and D. Now
the idea is that first I do SQL Join of A & B based on some field, then I
create append stream from Joined A, let's call it E. Then I need to
assign timestamps to E since it is a result of joining and Flink can't
figure out the timestamps.

Next, I union E & C, to create some F stream. Then finally I connect E & C
using `keyBy` and CoProcessFunction. Now the issue I am facing is that if I
try to, it works fine if I enforce the parallelism of E to be 1 by invoking
*setParallelism*. But if parallelism is higher than 1, for the same data -
the watermark is not progressing correctly. I can see that *CoProcessFunction
*methods are invoked and that data is produced, but the Watermark is never
progressing for this function. What I can see is that watermark is always
equal to (0 - allowedOutOfOrderness). I can see that timestamps are
correctly extracted and when I add debug prints I can actually see that
Watermarks are generated for all streams, but for some reason, if the
parallelism is > 1 they will never progress up to connect function. Is
there anything that needs to be done after SQL joins that I don't know of
??

Best Regards,
Dom.


Re: Implicit Flink Context Documentation

2020-03-16 Thread Padarn Wilson
Thanks for the clarification. I'll dig in then!

On Mon, 16 Mar 2020, 3:47 pm Piotr Nowojski,  wrote:

> Hi,
>
> We are not maintaining internal docs. We have design docs for newly
> proposed features (previously informal design docs published on dev mailing
> list and recently as FLIP documents [1]), but keyed state is such an old
> concept that dates back so much into the past, that I’m pretty sure it pre
> dates any of that. So you would have to digg through the code if you want
> to understand it.
>
> Piotrek
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
>
> On 13 Mar 2020, at 16:14, Padarn Wilson  wrote:
>
> Thanks Piotr,
>
> Conceptually I understand (and use) the key'ed state quite a lot, but the
> implementation details are what I was looking for.
>
> It looks like
> `org.apache.flink.streaming.api.operators.AbstractStreamOperator#setKeyContextElement1`
> is what I'm looking for though. It would be cool if there were some
> internals design doc however? Quite hard to dig through the code as there
> is a log tied to how the execution of the job actually happens.
>
> Padarn
>
> On Fri, Mar 13, 2020 at 9:43 PM Piotr Nowojski 
> wrote:
>
>> Hi,
>>
>> Please take a look for example here:
>>
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#keyed-state
>> And the example in particular
>>
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#using-managed-keyed-state
>>
>> The part about "there is a specific key implicitly in context” might be
>> referring to the fact, that for every instance of `CountWindowAverage` that
>> will be running in the cluster, user doesn’t have to set the key context
>> explicility. Flink will set the the key context automatically for the
>> `ValueState> sum;` before any invocation of
>> `CountWindowAverage#flatMap` method.
>>
>> In other words, one parallel instance of `CountWindowAverage` function,
>> for two consecutive invocations of `CountWindowAverage#flatMap` can be
>> referring to different underlying value of `CountWindowAverage#sum` field.
>> For details you could take a look at
>> `org.apache.flink.streaming.api.operators.AbstractStreamOperator#setKeyContextElement1`
>> method and how it’s being used/implemented.
>>
>> I hope that helps.
>>
>> Piotrek
>>
>> On 13 Mar 2020, at 08:20, Padarn Wilson  wrote:
>>
>> Hi Users,
>>
>> I am trying to understand the details of how some aspects of Flink work.
>>
>> While understanding `keyed state` I kept coming up against a claim that 
>> `there
>> is a specific key implicitly in context` I would like to understand how
>> this works, which I'm guessing means understanding the details of the
>> runtime context: Is there any documentation or FLIP someone can recommend
>> on this?
>>
>>
>>
>


Re: [EXT.MSG] Re: datadog http reporter metrics

2020-03-16 Thread Chesnay Schepler
It would only be logged when using 1.10 unfortunately; but you should be 
able to use the 1.10 version of the reporter with your version of Flink 
to at least confirm that it is the same issue as FLINK-16611.


On 16/03/2020 11:35, Yitzchak Lieberman wrote:

No, tried to find error/warn logs for rejected metrics, nothing...
tor that case there should be an error, right? (when report is too large)
I saw that there are some changes on version 1.10 for datadog 
reporter, maybe I should upgrade to this version?


On Mon, Mar 16, 2020 at 11:47 AM Chesnay Schepler > wrote:


Do you see anything in the logs? In another thread a user reported
that the datadog reporter could stop working when faced with a
large number of metrics since datadog was rejecting the report due
to being too large.

On 15/03/2020 12:22, Yitzchak Lieberman wrote:

Anyone?

On Wed, Mar 11, 2020 at 11:23 PM Yitzchak Lieberman
mailto:yitzch...@sentinelone.com>> wrote:

Hi.

Did someone encountered problem with sending metrics with
datadog http reporter?
My setup is flink version 1.8.2 deployed on k8s with 1 job
manager and 10 task managers.
Every version deploy I see metrics on my dashboard but after
a few minutes its stopped being sent from all task managers
while job manager still sends (with no error/warn on the logs).
Is it possible to be blocked by datadog due to the cluster
size? my staging cluster with 3 servers sends without any
problem.

Thanks in advance,
Yitzchak.







Re: [EXT.MSG] Re: datadog http reporter metrics

2020-03-16 Thread Yitzchak Lieberman
No, tried to find error/warn logs for rejected metrics, nothing...
tor that case there should be an error, right? (when report is too large)
I saw that there are some changes on version 1.10 for datadog reporter,
maybe I should upgrade to this version?

On Mon, Mar 16, 2020 at 11:47 AM Chesnay Schepler 
wrote:

> Do you see anything in the logs? In another thread a user reported that
> the datadog reporter could stop working when faced with a large number of
> metrics since datadog was rejecting the report due to being too large.
>
> On 15/03/2020 12:22, Yitzchak Lieberman wrote:
>
> Anyone?
>
> On Wed, Mar 11, 2020 at 11:23 PM Yitzchak Lieberman <
> yitzch...@sentinelone.com> wrote:
>
>> Hi.
>>
>> Did someone encountered problem with sending metrics with datadog http
>> reporter?
>> My setup is flink version 1.8.2 deployed on k8s with 1 job manager and 10
>> task managers.
>> Every version deploy I see metrics on my dashboard but after a few
>> minutes its stopped being sent from all task managers while job manager
>> still sends (with no error/warn on the logs).
>> Is it possible to be blocked by datadog due to the cluster size? my
>> staging cluster with 3 servers sends without any problem.
>>
>> Thanks in advance,
>> Yitzchak.
>>
>
>


Re: Automatically Clearing Temporary Directories

2020-03-16 Thread David Maddison
Thanks for the responses and thanks Gary for the confirmation.

Just to give some background, we deploy Flink inside Kubernetes so there is
a chance that TaskManagers COULD be shut down in a non-graceful way leaving
cache artifacts on the temporary volumes.

With Gary's confirmation, we'll add an init container to make sure the
volumes are cleared before a TM starts.

/David/

On Thu, Mar 12, 2020 at 8:24 AM Gary Yao  wrote:

> Hi David,
>
> > Would it be safe to automatically clear the temporary storage every time
> when a TaskManager is started?
> > (Note: the temporary volumes in use are dedicated to the TaskManager and
> not shared :-)
> Yes, it is safe in your case.
>
> Best,
> Gary
>
> On Tue, Mar 10, 2020 at 6:39 PM David Maddison 
> wrote:
>
>> Hi,
>>
>> When a TaskManager is restarted it can leave behind unreferenced
>> BlobServer cache directories in the temporary storage that never get
>> cleaned up.  Would it be safe to automatically clear the temporary storage
>> every time when a TaskManager is started?
>>
>> (Note: the temporary volumes in use are dedicated to the TaskManager and
>> not shared :-)
>>
>> Thanks in advance,
>>
>> David.
>>
>


Re: Flink on Kubernetes Vs Flink Natively on Kubernetes

2020-03-16 Thread Pankaj Chand
Hi Xintong,

Thank you for the explanation!

If I run Flink "natively" on Kubernetes, will I also be able to run Spark
on the same Kubernetes cluster, or will it make the Kubernetes cluster be
reserved for Flink only?

Thank you!

Pankaj

On Mon, Mar 16, 2020 at 5:41 AM Xintong Song  wrote:

> Forgot to mention that "running Flink natively on Kubernetes" is newly
> introduced and is only available for Flink 1.10 and above.
>
>
> Thank you~
>
> Xintong Song
>
>
>
> On Mon, Mar 16, 2020 at 5:40 PM Xintong Song 
> wrote:
>
>> Hi Pankaj,
>>
>> "Running Flink on Kubernetes" refers to the old way that basically
>> deploys a Flink standalone cluster on Kubernetes. We leverage scripts to
>> run Flink Master and TaskManager processes inside Kubernetes container. In
>> this way, Flink is not ware of whether it's running in containers or
>> directly on physical machines, and will not interact with the Kubernetes
>> Master. Flink Master reactively accept all registered TaskManagers, whose
>> number is decided by the Kubernetes replica.
>>
>> "Running Flink natively on Kubernetes" refers deploy Flink as a
>> Kubernetes Job. Flink Master will interact with Kubernetes Master, and
>> actively requests for pods/containers, like on Yarn/Mesos.
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Mon, Mar 16, 2020 at 4:03 PM Pankaj Chand 
>> wrote:
>>
>>> Hi all,
>>>
>>> I want to run Flink, Spark and other processing engines on a single
>>> Kubernetes cluster.
>>>
>>> From the Flink documentation, I did not understand the difference
>>> between:
>>> (1) Running Flink on Kubernetes, Versus (2) Running Flink natively on
>>> Kubernetes.
>>>
>>> Could someone please explain the difference between the two, and when
>>> would you use which option?
>>>
>>> Thank you,
>>>
>>> Pankaj
>>>
>>


Re: datadog metrics

2020-03-16 Thread Chesnay Schepler

I've created https://issues.apache.org/jira/browse/FLINK-16611.

@Steva Any chance you could contribute your changes, or some insight on 
what would need to be changed?


On 11/03/2020 23:16, Steve Whelan wrote:

Hi Fabian,

We ran into the same issue. We modified the reporter to emit the 
metrics in chunks and it worked fine after. Would be interested in 
seeing a ticket on this as well.


- Steve

On Wed, Mar 11, 2020 at 5:13 AM Chesnay Schepler > wrote:


Please open a JIRA; we may have to split the datatog report into
several chunks.

On 09/03/2020 07:47, Fanbin Bu wrote:

quote from the following link:

http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/SQL-Query-named-operator-exceeds-80-characters-td24807.html#a24818

"This is a safeguard in the metric system to prevent extremely
long names
(as these could cause the reporting to fail); so long as the
prefix is
unique you can safely ignore this warning."

I do see from log that my sql operator name is too long and says
it's truncated.
But i still failed to report to datadog.

Thanks
Fanbin

On Sun, Mar 8, 2020 at 11:36 PM Fanbin Bu mailto:fanbin...@coinbase.com>> wrote:

Hi,

Has anybody seen this error before and what is the suggested
way to solve it?

2020-03-07 02:54:34,100 WARN
 org.apache.flink.metrics.datadog.DatadogHttpClient          
 - Failed to send request to Datadog (response was
Response{protocol=http/1.1, code=413, message=Request Entity
Too Large, url=https://app.datadoghq.com/api/v1/series?api_key=

thanks,
Fanbin







Re: datadog http reporter metrics

2020-03-16 Thread Chesnay Schepler
Do you see anything in the logs? In another thread a user reported that 
the datadog reporter could stop working when faced with a large number 
of metrics since datadog was rejecting the report due to being too large.


On 15/03/2020 12:22, Yitzchak Lieberman wrote:

Anyone?

On Wed, Mar 11, 2020 at 11:23 PM Yitzchak Lieberman 
mailto:yitzch...@sentinelone.com>> wrote:


Hi.

Did someone encountered problem with sending metrics with datadog
http reporter?
My setup is flink version 1.8.2 deployed on k8s with 1 job manager
and 10 task managers.
Every version deploy I see metrics on my dashboard but after a few
minutes its stopped being sent from all task managers while job
manager still sends (with no error/warn on the logs).
Is it possible to be blocked by datadog due to the cluster size?
my staging cluster with 3 servers sends without any problem.

Thanks in advance,
Yitzchak.





Re: Flink on Kubernetes Vs Flink Natively on Kubernetes

2020-03-16 Thread Xintong Song
Hi Pankaj,

"Running Flink on Kubernetes" refers to the old way that basically deploys
a Flink standalone cluster on Kubernetes. We leverage scripts to run Flink
Master and TaskManager processes inside Kubernetes container. In this way,
Flink is not ware of whether it's running in containers or directly on
physical machines, and will not interact with the Kubernetes Master. Flink
Master reactively accept all registered TaskManagers, whose number is
decided by the Kubernetes replica.

"Running Flink natively on Kubernetes" refers deploy Flink as a Kubernetes
Job. Flink Master will interact with Kubernetes Master, and actively
requests for pods/containers, like on Yarn/Mesos.

Thank you~

Xintong Song



On Mon, Mar 16, 2020 at 4:03 PM Pankaj Chand 
wrote:

> Hi all,
>
> I want to run Flink, Spark and other processing engines on a single
> Kubernetes cluster.
>
> From the Flink documentation, I did not understand the difference between:
> (1) Running Flink on Kubernetes, Versus (2) Running Flink natively on
> Kubernetes.
>
> Could someone please explain the difference between the two, and when
> would you use which option?
>
> Thank you,
>
> Pankaj
>


Re: Flink on Kubernetes Vs Flink Natively on Kubernetes

2020-03-16 Thread Xintong Song
Forgot to mention that "running Flink natively on Kubernetes" is newly
introduced and is only available for Flink 1.10 and above.


Thank you~

Xintong Song



On Mon, Mar 16, 2020 at 5:40 PM Xintong Song  wrote:

> Hi Pankaj,
>
> "Running Flink on Kubernetes" refers to the old way that basically deploys
> a Flink standalone cluster on Kubernetes. We leverage scripts to run Flink
> Master and TaskManager processes inside Kubernetes container. In this way,
> Flink is not ware of whether it's running in containers or directly on
> physical machines, and will not interact with the Kubernetes Master. Flink
> Master reactively accept all registered TaskManagers, whose number is
> decided by the Kubernetes replica.
>
> "Running Flink natively on Kubernetes" refers deploy Flink as a Kubernetes
> Job. Flink Master will interact with Kubernetes Master, and actively
> requests for pods/containers, like on Yarn/Mesos.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Mon, Mar 16, 2020 at 4:03 PM Pankaj Chand 
> wrote:
>
>> Hi all,
>>
>> I want to run Flink, Spark and other processing engines on a single
>> Kubernetes cluster.
>>
>> From the Flink documentation, I did not understand the difference between:
>> (1) Running Flink on Kubernetes, Versus (2) Running Flink natively on
>> Kubernetes.
>>
>> Could someone please explain the difference between the two, and when
>> would you use which option?
>>
>> Thank you,
>>
>> Pankaj
>>
>


Re: Stop job with savepoint during graceful shutdown on a k8s cluster

2020-03-16 Thread Vijay Bhaskar
For point (1) above:
Its up to user to have proper sink and source to choose to have exactly
once semantics as per the documentation:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/guarantees.html
If we choose the supported source and sink combinations duplicates will be
avoided.

For point (2)
If the communication breaks across Job manager and task manager during the
save point or checkpoint operation,
checkpoint/save point will be declined. We can't have them

Regards
Bhaskar

On Sat, Mar 14, 2020 at 4:54 PM shravan 
wrote:

> Our understanding is to stop job with savepoint, all the task manager
> will persist their state during savepoint. If a Task Manager receives a
> shutdown signal while savepoint is being taken, does it complete the
> savepoint before shutdown ?
> [Ans ] Why task manager is shutdown suddenly? Are you saying about handling
> unpredictable shutdown while taking
> savepoint? In that case You can also use retained check point. In case
> current checkpoint has issues because of shutdown
> you will have previous checkpoint. So that you can use it. Now you will
> have
> 2 options, either savepoint/checkpoint. One of them
> will always be available.
> *[Followup Question]* When the processing service is shutdown say for
> maintenance as it is a graceful shutdown we are looking at means to avoid
> duplicates as exactly once message processing is guaranteed by our service
> .
> We are already starting the job based on checkpoint or savepoint whichever
> is the latest. When the job is started from last good checkpoint it results
> in duplicates.
>
> The job manager K8S service is configured as remote job manager address
> in Task Manager. This service may not be available during savepoint,  will
> this affect the communication between Task Manager and Job Manager during
> savepoint ?
> [Ans] you can go for HA right? Where you can run more than one jobmanager
> so
> that one is always service is available
> *[Followup Question]* As we mentioned above processing service is shut down
> for maintenance.
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: [DISCUSS] FLIP-111: Docker image unification

2020-03-16 Thread Andrey Zagrebin
Thanks for the further feedback Thomas and Yangze.

> A generic, dynamic configuration mechanism based on environment variables
is essential and it is already supported via envsubst and an environment
variable that can supply a configuration fragment

True, we already have this. As I understand this was introduced for
flexibility to template a custom flink-conf.yaml with env vars, put it into
the FLINK_PROPERTIES and merge it with the default one.
Could we achieve the same with the dynamic properties (-Drpc.port=1234),
passed as image args to run it, instead of FLINK_PROPERTIES?
They could be also parametrised with env vars. This would require
jobmanager.sh to properly propagate them to
the StandaloneSessionClusterEntrypoint though:
https://github.com/docker-flink/docker-flink/pull/82#issuecomment-525285552
cc @Till
This would provide a unified configuration approach.

> On the flip side, attempting to support a fixed subset of configuration
options is brittle and will probably lead to compatibility issues down the
road

I agree with it. The idea was to have just some shortcut scripted functions
to set options in flink-conf.yaml for a custom Dockerfile or entry point
script.
TASK_MANAGER_NUMBER_OF_TASK_SLOTS could be set as a dynamic property of
started JM.
I am not sure how many users depend on it. Maybe we could remove it.
It also looks we already have somewhat unclean state in
the docker-entrypoint.sh where some ports are set the hardcoded values
and then FLINK_PROPERTIES are applied potentially duplicating options in
the result flink-conf.yaml.

I can see some potential usage of env vars as standard entry point args but
for purposes related to something which cannot be achieved by passing entry
point args, like changing flink-conf.yaml options. Nothing comes into my
mind at the moment. It could be some setting specific to the running mode
of the entry point. The mode itself can stay the first arg of the entry
point.

> I would second that it is desirable to support Java 11

> Regarding supporting JAVA 11:
> - Not sure if it is necessary to ship JAVA. Maybe we could just change
> the base image from openjdk:8-jre to openjdk:11-jre in template docker
> file[1]. Correct me if I understand incorrectly. Also, I agree to move
> this out of the scope of this FLIP if it indeed takes much extra
> effort.

This is what I meant by bumping up the Java version in the docker hub Flink
image:
FROM openjdk:8-jre -> FROM openjdk:11-jre
This can be polled dependently in user mailing list.

> and in general use a base image that allows the (straightforward) use of
more recent versions of other software (Python etc.)

This can be polled whether to always include some version of python into
the docker hub image.
A potential problem here is once it is there, it is some hassle to
remove/change it in a custom extended Dockerfile.

It would be also nice to avoid maintaining images for various combinations
of installed Java/Scala/Python in docker hub.

> Regarding building from local dist:
> - Yes, I bring this up mostly for development purpose. Since k8s is
> popular, I believe more and more developers would like to test their
> work on k8s cluster. I'm not sure should all developers write a custom
> docker file themselves in this scenario. Thus, I still prefer to
> provide a script for devs.
> - I agree to keep the scope of this FLIP mostly for those normal
> users. But as far as I can see, supporting building from local dist
> would not take much extra effort.
> - The maven docker plugin sounds good. I'll take a look at it.

I would see any scripts introduced in this FLIP also as potential building
blocks for a custom dev Dockerfile.
Maybe, this will be all what we need for dev images or we write a dev
Dockerfile, highly parametrised for building a dev image.
If scripts stay in apache/flink-docker, it is also somewhat inconvenient to
use them in the main Flink repo but possible.
If we move them to apache/flink then we will have to e.g. include them into
the release to make them easily available in apache/flink-docker and
maintain them in main repo, although they are only docker specific.
All in all, I would say, once we implement them, we can revisit this topic.

Best,
Andrey

On Wed, Mar 11, 2020 at 8:58 AM Yangze Guo  wrote:

> Thanks for the reply, Andrey.
>
> Regarding building from local dist:
> - Yes, I bring this up mostly for development purpose. Since k8s is
> popular, I believe more and more developers would like to test their
> work on k8s cluster. I'm not sure should all developers write a custom
> docker file themselves in this scenario. Thus, I still prefer to
> provide a script for devs.
> - I agree to keep the scope of this FLIP mostly for those normal
> users. But as far as I can see, supporting building from local dist
> would not take much extra effort.
> - The maven docker plugin sounds good. I'll take a look at it.
>
> Regarding supporting JAVA 11:
> - Not sure if it is necessary to ship JAVA. Maybe we 

Re: 全量编译报错

2020-03-16 Thread Yangze Guo
报错信息是什么呢?代码分支是哪个?
您可以再pull一下最新的master,目前看最新的master是可以全量编译的[1]

[1] 
https://travis-ci.org/github/apache/flink/builds/662890263?utm_source=github_status_medium=notification

Best,
Yangze Guo

Best,
Yangze Guo


On Mon, Mar 16, 2020 at 4:48 PM 吴志勇 <1154365...@qq.com> wrote:
>
> 在项目根目录下执行 `mvn clean install -DskipTests`,报错。
>
> 代码是最新从github下载的。


????????????

2020-03-16 Thread ??????
?? `mvn clean install -DskipTests`

github

Re: 最新代码编译问题

2020-03-16 Thread tison
Hi,

You'd better use English in user mailing list.

If you prefer Chinese, you can post the email to user...@flink.apache.org .

Best,
tison.


tison  于2020年3月16日周一 下午4:25写道:

> 从 flink/ 根目录运行 mvn clean install -DskipTests
>
> 你这个问题是因为 impl 那些类是生成类,一般来说从根目录运行一次全量编译可以解决各种疑难杂症
>
> Best,
> tison.
>
>
> 吴志勇 <1154365...@qq.com> 于2020年3月16日周一 下午4:23写道:
>
>> 您好,
>> 我从github上下载了最新的代码。在IDEA中尝试编译,但是flink-table项目flink-sql-parser编译报错,
>>
>> test中也同样报错,
>>
>> 请问该如何解决呢?flink-sql-parser像是缺少了impl包呀。
>>
>


Re: 最新代码编译问题

2020-03-16 Thread tison
Hi,

You'd better use English in user mailing list.

If you prefer Chinese, you can post the email to user-zh@flink.apache.org .

Best,
tison.


tison  于2020年3月16日周一 下午4:25写道:

> 从 flink/ 根目录运行 mvn clean install -DskipTests
>
> 你这个问题是因为 impl 那些类是生成类,一般来说从根目录运行一次全量编译可以解决各种疑难杂症
>
> Best,
> tison.
>
>
> 吴志勇 <1154365...@qq.com> 于2020年3月16日周一 下午4:23写道:
>
>> 您好,
>> 我从github上下载了最新的代码。在IDEA中尝试编译,但是flink-table项目flink-sql-parser编译报错,
>>
>> test中也同样报错,
>>
>> 请问该如何解决呢?flink-sql-parser像是缺少了impl包呀。
>>
>


Re: 最新代码编译问题

2020-03-16 Thread tison
从 flink/ 根目录运行 mvn clean install -DskipTests

你这个问题是因为 impl 那些类是生成类,一般来说从根目录运行一次全量编译可以解决各种疑难杂症

Best,
tison.


吴志勇 <1154365...@qq.com> 于2020年3月16日周一 下午4:23写道:

> 您好,
> 我从github上下载了最新的代码。在IDEA中尝试编译,但是flink-table项目flink-sql-parser编译报错,
>
> test中也同样报错,
>
> 请问该如何解决呢?flink-sql-parser像是缺少了impl包呀。
>


????????????????

2020-03-16 Thread ??????
??
github??IDEAflink-tableflink-sql-parser??

test??

??flink-sql-parser??impl??

Streaming File Sink??????????

2020-03-16 Thread cs
Streaming File Sinkparquet avrobulk writefinal 
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(60 * 1000, CheckpointingMode.AT_LEAST_ONCE);
env.setStateBackend(new FsStateBackend(new Path("file:///g:/checkpoint")));
Schema schema = new 
Schema.Parser().parse("{\"namespace\":\"example.avro\",\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"favorite_number\",\"type\":[\"int\",\"null\"]},{\"name\":\"favorite_color\",\"type\":[\"string\",\"null\"]}]}");
final DataStreamSource

Flink on Kubernetes Vs Flink Natively on Kubernetes

2020-03-16 Thread Pankaj Chand
Hi all,

I want to run Flink, Spark and other processing engines on a single
Kubernetes cluster.

>From the Flink documentation, I did not understand the difference between:
(1) Running Flink on Kubernetes, Versus (2) Running Flink natively on
Kubernetes.

Could someone please explain the difference between the two, and when would
you use which option?

Thank you,

Pankaj


Re: Implicit Flink Context Documentation

2020-03-16 Thread Piotr Nowojski
Hi,

We are not maintaining internal docs. We have design docs for newly proposed 
features (previously informal design docs published on dev mailing list and 
recently as FLIP documents [1]), but keyed state is such an old concept that 
dates back so much into the past, that I’m pretty sure it pre dates any of 
that. So you would have to digg through the code if you want to understand it.

Piotrek

[1] 
https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals 


> On 13 Mar 2020, at 16:14, Padarn Wilson  wrote:
> 
> Thanks Piotr,
> 
> Conceptually I understand (and use) the key'ed state quite a lot, but the 
> implementation details are what I was looking for.
> 
> It looks like 
> `org.apache.flink.streaming.api.operators.AbstractStreamOperator#setKeyContextElement1`
>  is what I'm looking for though. It would be cool if there were some 
> internals design doc however? Quite hard to dig through the code as there is 
> a log tied to how the execution of the job actually happens.
> 
> Padarn
> 
> On Fri, Mar 13, 2020 at 9:43 PM Piotr Nowojski  > wrote:
> Hi,
> 
> Please take a look for example here:
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#keyed-state
>  
> 
> And the example in particular
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#using-managed-keyed-state
>  
> 
> 
> The part about "there is a specific key implicitly in context” might be 
> referring to the fact, that for every instance of `CountWindowAverage` that 
> will be running in the cluster, user doesn’t have to set the key context 
> explicility. Flink will set the the key context automatically for the 
> `ValueState> sum;` before any invocation of 
> `CountWindowAverage#flatMap` method.
> 
> In other words, one parallel instance of `CountWindowAverage` function, for 
> two consecutive invocations of `CountWindowAverage#flatMap` can be referring 
> to different underlying value of `CountWindowAverage#sum` field. For details 
> you could take a look at 
> `org.apache.flink.streaming.api.operators.AbstractStreamOperator#setKeyContextElement1`
>  method and how it’s being used/implemented.
> 
> I hope that helps.
> 
> Piotrek
> 
>> On 13 Mar 2020, at 08:20, Padarn Wilson > > wrote:
>> 
>> Hi Users,
>> 
>> I am trying to understand the details of how some aspects of Flink work.
>> 
>> While understanding `keyed state` I kept coming up against a claim that 
>> `there is a specific key implicitly in context` I would like to understand 
>> how this works, which I'm guessing means understanding the details of the 
>> runtime context: Is there any documentation or FLIP someone can recommend on 
>> this?  
> 



Re: Communication between two queries

2020-03-16 Thread Piotr Nowojski
Hi,

Let us know if something doesn’t work :)

Piotrek

> On 16 Mar 2020, at 08:42, Mikael Gordani  wrote:
> 
> Hi,
> I'll try it out =) 
> 
> Cheers!
> 
> Den mån 16 mars 2020 kl 08:32 skrev Piotr Nowojski  >:
> Hi,
> 
> In that case you could try to implement your `FilterFunction` as two input 
> operator, with broadcast control input, that would be setting the 
> `global_var`. Broadcast control input can be originating from some source, or 
> from some operator.
> 
> Piotrek
> 
>> On 13 Mar 2020, at 15:47, Mikael Gordani > > wrote:
>> 
>> Hi Piotr!
>> Thanks for your response, I'll try to explain what I'm trying to achieve in 
>> more detail:
>> 
>> Essentially, If I've two queries, in which has the same operators and runs 
>> in the same task, I would want to figure out some way of controlling the 
>> ingestion from a source to the respective queries in such a way that only 
>> one of the queries receive data, based on a condition. 
>> For more context, the second query (query2), is equipped with instrumented 
>> operators, which are standard operators extended with some extra 
>> functionality, in my case, they enrich the tuples with meta-data.
>> 
>> Source --> Filter1 ---> rest of query1
>>|
>>v
>>Filter2 ---> rest of query2
>> 
>> By using filters prior to the queries, they allow records to pass depending 
>> on a condition, let's say a global boolean variable (which is initially set 
>> to false).
>> If it's set to true, Filter1 will accept every record and Filter2 will 
>> disregard every record.
>> If it's set to false, Filter2 will accept every record and Filter1 will 
>> disregard every record.
>> So the filter operators looks something like this: 
>> boolean global_var = false;
>> 
>> private static class filter1 implements FilterFunction {
>> @Override
>> public boolean filter(Tuple t) throws Exception {
>> return !global_var;
>> }
>> }
>> 
>> private static class filter2 implements FilterFunction {
>> @Override
>> public boolean filter(Tuple t) throws Exception {
>> return global_var;
>> }
>> }
>> 
>> Then later on, in the respective queries, there are some processing logic in 
>> which changes the value of the global variable, thus enabling and disabling 
>> the flow of data from the source to the respective queries.
>> The problem lies in this global variable being problematic in distributed 
>> deployments, in which I'm having a hard time figuring out how to solve.
>> Is it a bit more clear? =)
> 
> 
> 
> -- 
> Med Vänliga Hälsningar,
> Mikael Gordani



Re: Communication between two queries

2020-03-16 Thread Mikael Gordani
Hi,
I'll try it out =)

Cheers!

Den mån 16 mars 2020 kl 08:32 skrev Piotr Nowojski :

> Hi,
>
> In that case you could try to implement your `FilterFunction` as two input
> operator, with broadcast control input, that would be setting the
> `global_var`. Broadcast control input can be originating from some source,
> or from some operator.
>
> Piotrek
>
> On 13 Mar 2020, at 15:47, Mikael Gordani  wrote:
>
> Hi Piotr!
> Thanks for your response, I'll try to explain what I'm trying to achieve
> in more detail:
>
> Essentially, If I've two queries, in which has the same operators and runs
> in the same task, I would want to figure out some way of controlling the
> ingestion from *a source* to the respective queries in such a way that
> only one of the queries receive data, based on a condition.
> For more context, the second query (query2), is equipped with instrumented
> operators, which are standard operators extended with some extra
> functionality, in my case, they enrich the tuples with meta-data.
>
> Source --> *Filter1* ---> rest of query1
>|
>v
>*Filter2* ---> rest of query2
>
> By using *filters* prior to the queries, they allow records to pass
> depending on a condition*, *let's say a global boolean variable (which is
> initially set to false).
> If it's set to *true, Filter1 will accept every record and Filter2 will
> disregard every record.*
> If it's set to
> *false, Filter2 will accept every record and Filter1 will disregard every
> record.*
>
> *So the filter operators looks something like this: *
>
> boolean global_var = false;
>
> private static class filter1 implements FilterFunction {
> @Override
> public boolean filter(Tuple t) throws Exception {
> return !global_var;
> }
> }
>
> private static class filter2 implements FilterFunction {
> @Override
> public boolean filter(Tuple t) throws Exception {
> return global_var;
> }
> }
>
>
> Then later on, in the respective queries, there are some processing logic
> in which changes the value of the global variable, thus enabling and
> disabling the flow of data from the source to the respective queries.
> The problem lies in this global variable being problematic in distributed
> deployments, in which I'm having a hard time figuring out how to solve.
> Is it a bit more clear? =)
>
>
>

-- 
Med Vänliga Hälsningar,
Mikael Gordani


Re: Expected behaviour when changing operator parallelism but starting from an incremental checkpoint

2020-03-16 Thread Piotr Nowojski
Hi Seth,

> Currently, all rescaling operations technically work with checkpoints. That 
> is purely by chance that the implementation supports that, and the line is 
> because the community is not committed to maintaining that functionality

Are you sure that’s the case? Support for rescaling from checkpoint is as far 
as I know, something that we want/need to have:
- if your cluster has just lost a node due to some hardware failure, without 
downscaling support your job will not be able to recover
- future planned life rescaling efforts

Also this [1] seems to contradict your statement?

Lack of support for rescaling for unaligned checkpoints will be hopefully a 
temporarily limitation of the first version and it’s on our roadmap to solve 
this in the future.

Piotrek

[1] 
https://flink.apache.org/features/2017/07/04/flink-rescalable-state.html#rescaling-stateful-stream-processing-jobs
 


> On 13 Mar 2020, at 17:44, Seth Wiesman  wrote:
> 
> Hi Aaron, 
> 
> Currently, all rescaling operations technically work with checkpoints. That 
> is purely by chance that the implementation supports that, and the line is 
> because the community is not committed to maintaining that functionality. As 
> we add cases, such as unaligned checkpoints, which actually prevent rescaling 
> the documentation will be updated accordingly. FLIP-47 has more to do with 
> consolidating terminology and how actions are triggered and are not 
> particularly relevant to the discussion of rescaling jobs. 
> 
> On Fri, Mar 13, 2020 at 11:39 AM Aaron Levin  > wrote:
> Hi Piotr,
> 
> Thanks for your response! I understand that checkpoints and savepoints may be 
> diverging (for unaligned checkpoints) but parts also seem to be converging 
> per FLIP-47[0]. Specifically, in FLIP-47 they state that rescaling is 
> "Supported but not in all cases" for checkpoints. What I'm hoping to find is 
> guidance or documentation on when rescaling is supported for checkpoints, 
> and, more importantly, if the cases where it's not supported will result in 
> hard or silent failures. 
> 
> The context here is that we rely on the exactly-once semantics for our Flink 
> jobs in some important systems. In some cases when a job is in a bad state it 
> may not be able to take a checkpoint, but changing the job's parallelism may 
> resolve the issue. Therefore it's important for us to know if deploying from 
> a checkpoint, on purpose or by operator error, will break the semantic 
> guarantees of our job.
> 
> Hard failure in the cases where you cannot change parallelism would be the 
> desired outcome imo.
> 
> Thank you!
> 
> [0] 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-47%3A+Checkpoints+vs.+Savepoints
>  
> 
> 
> Best,
> 
> Aaron Levin
> 
> On Fri, Mar 13, 2020 at 9:08 AM Piotr Nowojski  > wrote:
> Hi,
> 
> Generally speaking changes of parallelism is supported between checkpoints 
> and savepoints. Other changes to the job’s topology, like 
> adding/changing/removing operators, changing types in the job graph are only 
> officially supported via savepoints.
> 
> But in reality, as for now, there is no difference between checkpoints and 
> savepoints, but that’s subject to change, so it’s better not to relay this 
> behaviour. For example with unaligned checkpoints [1] (hopefully in 1.11), 
> there will be a difference between those two concepts.
> 
> Piotrek
> 
> [1] 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A+Unaligned+Checkpoints
>  
> 
> 
>> On 12 Mar 2020, at 12:16, Aaron Levin > > wrote:
>> 
>> Hi,
>> 
>> What's the expected behaviour of:
>> 
>> * changing an operator's parallelism
>> * deploying this change from an incremental (RocksDB) checkpoint instead of 
>> a savepoint
>> 
>> The flink docs[0][1] are a little unclear on what the expected behaviour is 
>> here. I understand that the key-space is being changed because parallelism 
>> is changed. I've seen instances where this happens and a job does not fail. 
>> But how does it treat potentially missing state for a given key? 
>> 
>> I know I can test this, but I'm curious what the _expected_ behaviour is? 
>> I.e. what behaviour can I rely on, which won't change between versions or 
>> releases? Do we expect the job to fail? Do we expect missing keys to just be 
>> considered empty? 
>> 
>> Thanks!
>> 
>> [0] 
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#what-is-a-savepoint-how-is-a-savepoint-different-from-a-checkpoint
>>  
>> 

Re: Communication between two queries

2020-03-16 Thread Piotr Nowojski
Hi,

In that case you could try to implement your `FilterFunction` as two input 
operator, with broadcast control input, that would be setting the `global_var`. 
Broadcast control input can be originating from some source, or from some 
operator.

Piotrek

> On 13 Mar 2020, at 15:47, Mikael Gordani  wrote:
> 
> Hi Piotr!
> Thanks for your response, I'll try to explain what I'm trying to achieve in 
> more detail:
> 
> Essentially, If I've two queries, in which has the same operators and runs in 
> the same task, I would want to figure out some way of controlling the 
> ingestion from a source to the respective queries in such a way that only one 
> of the queries receive data, based on a condition. 
> For more context, the second query (query2), is equipped with instrumented 
> operators, which are standard operators extended with some extra 
> functionality, in my case, they enrich the tuples with meta-data.
> 
> Source --> Filter1 ---> rest of query1
>|
>v
>Filter2 ---> rest of query2
> 
> By using filters prior to the queries, they allow records to pass depending 
> on a condition, let's say a global boolean variable (which is initially set 
> to false).
> If it's set to true, Filter1 will accept every record and Filter2 will 
> disregard every record.
> If it's set to false, Filter2 will accept every record and Filter1 will 
> disregard every record.
> So the filter operators looks something like this: 
> boolean global_var = false;
> 
> private static class filter1 implements FilterFunction {
> @Override
> public boolean filter(Tuple t) throws Exception {
> return !global_var;
> }
> }
> 
> private static class filter2 implements FilterFunction {
> @Override
> public boolean filter(Tuple t) throws Exception {
> return global_var;
> }
> }
> 
> Then later on, in the respective queries, there are some processing logic in 
> which changes the value of the global variable, thus enabling and disabling 
> the flow of data from the source to the respective queries.
> The problem lies in this global variable being problematic in distributed 
> deployments, in which I'm having a hard time figuring out how to solve.
> Is it a bit more clear? =)