Re: How to gracefully handle job recovery failures

2021-06-15 Thread Li Peng
Understood, thanks all!

-Li

On Fri, Jun 11, 2021 at 12:40 AM Till Rohrmann  wrote:

> Hi Li,
>
> Roman is right about Flink's behavior and what you can do about it. The
> idea behind its current behavior is the following: If Flink cannot recover
> a job, it is very hard for it to tell whether it is due to an intermittent
> problem or a permanent one. No matter how often you retry, you can always
> run into the situation that you give up too early. Since we believe that
> this would be a very surprising behavior because it effectively means that
> Flink can forget about jobs in case of a recovery, we decided that this
> situation requires the intervention of the user to resolve the situation.
> By enforcing the user to make a decision, we make this problem very
> explicit and require her to think about the situation. I hope this makes
> sense.
>
> So in your case, what you have to do is to remove the relevant ZooKeeper
> zNode which contains the pointer to the submitted job graph file. That way,
> Flink will no longer try to recover this job. I do agree that this is a bit
> cumbersome and it could definitely help to offer a small tool to do this
> kind of cleanup task.
>
> Cheers,
> Till
>
> On Fri, Jun 11, 2021 at 8:24 AM Roman Khachatryan 
> wrote:
>
>> Hi Li,
>>
>> If I understand correctly, you want the cluster to proceed recovery,
>> skipping some non-recoverable jobs (but still recover others).
>> The only way I can think of is to remove the corresponding nodes in
>> ZooKeeper which is not very safe.
>>
>> I'm pulling in Robert and Till who might know better.
>>
>> Regards,
>> Roman
>>
>>
>> On Thu, Jun 10, 2021 at 8:56 PM Li Peng  wrote:
>> >
>> > Hi Roman,
>> >
>> > Is there a way to abandon job recovery after a few tries? By that I
>> mean that this problem was fixed by me restarting the cluster and not try
>> to recover a job. Is there some setting that emulates what I did, so I
>> don't need to do manual intervention if this happens again??
>> >
>> > Thanks,
>> > Li
>> >
>> > On Thu, Jun 10, 2021 at 9:50 AM Roman Khachatryan 
>> wrote:
>> >>
>> >> Hi Li,
>> >>
>> >> The missing file is a serialized job graph and the job recovery can't
>> >> proceed without it.
>> >> Unfortunately, the cluster can't proceed if one of the jobs can't
>> recover.
>> >>
>> >> Regards,
>> >> Roman
>> >>
>> >> On Thu, Jun 10, 2021 at 6:02 AM Li Peng  wrote:
>> >> >
>> >> > Hey folks, we have a cluster with HA mode enabled, and recently
>> after doing a zookeeper restart, our Kafka cluster (Flink v. 1.11.3, Scala
>> v. 2.12) crashed and was stuck in a crash loop, with the following error:
>> >> >
>> >> > 2021-06-10 02:14:52.123 [cluster-io-thread-1] ERROR
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint  - Fatal error
>> occurred in the cluster entrypoint.
>> >> > java.util.concurrent.CompletionException:
>> org.apache.flink.util.FlinkRuntimeException: Could not recover job with job
>> id .
>> >> > at
>> java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)
>> >> > at
>> java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319)
>> >> > at
>> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702)
>> >> > at
>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>> >> > at
>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>> >> > at java.base/java.lang.Thread.run(Thread.java:834)
>> >> > Caused by: org.apache.flink.util.FlinkRuntimeException: Could not
>> recover job with job id .
>> >> > at
>> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJob(SessionDispatcherLeaderProcess.java:149)
>> >> > at
>> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJobs(SessionDispatcherLeaderProcess.java:125)
>> >> > at
>> org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:200)
>> >> > at
>> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJobsIfRunning

Re: How to gracefully handle job recovery failures

2021-06-10 Thread Li Peng
Hi Roman,

Is there a way to abandon job recovery after a few tries? By that I mean
that this problem was fixed by me restarting the cluster and not try to
recover a job. Is there some setting that emulates what I did, so I don't
need to do manual intervention if this happens again??

Thanks,
Li

On Thu, Jun 10, 2021 at 9:50 AM Roman Khachatryan  wrote:

> Hi Li,
>
> The missing file is a serialized job graph and the job recovery can't
> proceed without it.
> Unfortunately, the cluster can't proceed if one of the jobs can't recover.
>
> Regards,
> Roman
>
> On Thu, Jun 10, 2021 at 6:02 AM Li Peng  wrote:
> >
> > Hey folks, we have a cluster with HA mode enabled, and recently after
> doing a zookeeper restart, our Kafka cluster (Flink v. 1.11.3, Scala v.
> 2.12) crashed and was stuck in a crash loop, with the following error:
> >
> > 2021-06-10 02:14:52.123 [cluster-io-thread-1] ERROR
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint  - Fatal error
> occurred in the cluster entrypoint.
> > java.util.concurrent.CompletionException:
> org.apache.flink.util.FlinkRuntimeException: Could not recover job with job
> id .
> > at
> java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)
> > at
> java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319)
> > at
> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702)
> > at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> > at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> > at java.base/java.lang.Thread.run(Thread.java:834)
> > Caused by: org.apache.flink.util.FlinkRuntimeException: Could not
> recover job with job id .
> > at
> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJob(SessionDispatcherLeaderProcess.java:149)
> > at
> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJobs(SessionDispatcherLeaderProcess.java:125)
> > at
> org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:200)
> > at
> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJobsIfRunning(SessionDispatcherLeaderProcess.java:115)
> > at
> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
> > ... 3 common frames omitted
> > Caused by: org.apache.flink.util.FlinkException: Could not retrieve
> submitted JobGraph from state handle under
> /. This indicates that the retrieved state
> handle is broken. Try cleaning the state handle store.
> > at
> org.apache.flink.runtime.jobmanager.ZooKeeperJobGraphStore.recoverJobGraph(ZooKeeperJobGraphStore.java:192)
> > at
> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJob(SessionDispatcherLeaderProcess.java:146)
> > ... 7 common frames omitted
> > Caused by: java.io.FileNotFoundException: No such file or directory:
> s3a://myfolder/recovery/myservice/2021-05-25T04:36:33Z/submittedJobGraph06ea8512c493
> > at
> org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2255)
> > at
> org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2149)
> > at
> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2088)
> > at org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:699)
> > at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:950)
> > at
> org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.open(HadoopFileSystem.java:120)
> > at
> org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.open(HadoopFileSystem.java:37)
> > at
> org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.open(PluginFileSystemFactory.java:127)
> > at
> org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:69)
> > at
> org.apache.flink.runtime.state.RetrievableStreamStateHandle.openInputStream(RetrievableStreamStateHandle.java:65)
> > at
> org.apache.flink.runtime.state.RetrievableStreamStateHandle.retrieveState(RetrievableStreamStateHandle.java:58)
> > at
> org.apache.flink.runtime.jobmanager.ZooKeeperJobGraphStore.recoverJobGraph(ZooKeeperJobGraphStore.java:186)
> > ... 8 common frames omitted
> >
> > We have an idea of why the file might be gone and are addressing it, but
> my question is: how can I configure this in such a way so that a missing
> job file doesn't trap the cluster in a forever restart loop? Is there some
> setting to just treat this like a complete fresh deployment if the recovery
> file is missing?
> >
> > Thanks!
> > Li
> >
> >
> >
> >
>


How to gracefully handle job recovery failures

2021-06-09 Thread Li Peng
Hey folks, we have a cluster with HA mode enabled, and recently after doing
a zookeeper restart, our Kafka cluster (Flink v. 1.11.3, Scala v. 2.12)
crashed and was stuck in a crash loop, with the following error:

2021-06-10 02:14:52.123 [cluster-io-thread-1] ERROR
org.apache.flink.runtime.entrypoint.ClusterEntrypoint  - Fatal error
occurred in the cluster entrypoint.
java.util.concurrent.CompletionException:
org.apache.flink.util.FlinkRuntimeException: Could not recover job with job
id .
at
java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)
at
java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319)
at
java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.flink.util.FlinkRuntimeException: Could not recover
job with job id .
at
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJob(SessionDispatcherLeaderProcess.java:149)
at
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJobs(SessionDispatcherLeaderProcess.java:125)
at
org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:200)
at
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJobsIfRunning(SessionDispatcherLeaderProcess.java:115)
at
java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
... 3 common frames omitted
Caused by: org.apache.flink.util.FlinkException: Could not retrieve
submitted JobGraph from state handle under
/. This indicates that the retrieved state
handle is broken. Try cleaning the state handle store.
at
org.apache.flink.runtime.jobmanager.ZooKeeperJobGraphStore.recoverJobGraph(ZooKeeperJobGraphStore.java:192)
at
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJob(SessionDispatcherLeaderProcess.java:146)
... 7 common frames omitted
Caused by: java.io.FileNotFoundException: No such file or directory:
s3a://myfolder/recovery/myservice/2021-05-25T04:36:33Z/submittedJobGraph06ea8512c493
at
org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2255)
at
org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2149)
at
org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2088)
at org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:699)
at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:950)
at
org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.open(HadoopFileSystem.java:120)
at
org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.open(HadoopFileSystem.java:37)
at
org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.open(PluginFileSystemFactory.java:127)
at
org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:69)
at
org.apache.flink.runtime.state.RetrievableStreamStateHandle.openInputStream(RetrievableStreamStateHandle.java:65)
at
org.apache.flink.runtime.state.RetrievableStreamStateHandle.retrieveState(RetrievableStreamStateHandle.java:58)
at
org.apache.flink.runtime.jobmanager.ZooKeeperJobGraphStore.recoverJobGraph(ZooKeeperJobGraphStore.java:186)
... 8 common frames omitted

We have an idea of why the file might be gone and are addressing it, but my
question is: *how can I configure this in such a way so that a missing job
file doesn't trap the cluster in a forever restart loop?* Is there some
setting to just treat this like a complete fresh deployment if the recovery
file is missing?

Thanks!
Li


Re: How do I increase number of db connections of the Flink JDBC Connector?

2021-02-19 Thread Li Peng
Ah got it, thanks!

On Thu, Feb 18, 2021 at 10:53 PM Chesnay Schepler 
wrote:

> Every works uses exactly 1 connection, so in order to increase the
> number of connections you must indeed increase the worker parallelism.
>
> On 2/19/2021 6:51 AM, Li Peng wrote:
> > Hey folks,
> >
> > I'm trying to use flink to write high throughput incoming data to a
> > SQL db using the JDBC Connector as described here:
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/jdbc.html
> > <
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/jdbc.html>
>
> >
> >
> > However, after enabling this, my data consumption rate slowed down to
> > a crawl. After doing some digging, it seemed like the number of
> > connections to the db was very low, almost 1 connection per worker
> > (there are 9 workers). Which seemed like the culprit if Flink was
> > blocking and waiting for a single db connection to do all this work?
> >
> > Is there a way to tell the JDBC connector to use more db connections?
> > Or do I need to specifically increase the parallelism of the connector
> > to beyond the default?
> >
> > And if I need to increase the parallelism, currently my setup is
> > having 9 workers, 1 task slot for each (this cluster is dedicated to
> > one job). That means I have to increase the number of task slots
> > before increasing parallelism, right?
> >
> > My flink version is 1.10.1 and my jdbc connection
> > is flink-connector-jdbc_2.11:1.11.0.
> >
> > Thanks!
> > Li
>
>
>


How do I increase number of db connections of the Flink JDBC Connector?

2021-02-18 Thread Li Peng
Hey folks,

I'm trying to use flink to write high throughput incoming data to a SQL db
using the JDBC Connector as described here:

https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/jdbc.html

However, after enabling this, my data consumption rate slowed down to a
crawl. After doing some digging, it seemed like the number of connections
to the db was very low, almost 1 connection per worker (there are 9
workers). Which seemed like the culprit if Flink was blocking and waiting
for a single db connection to do all this work?

Is there a way to tell the JDBC connector to use more db connections? Or do
I need to specifically increase the parallelism of the connector to beyond
the default?

And if I need to increase the parallelism, currently my setup is having 9
workers, 1 task slot for each (this cluster is dedicated to one job). That
means I have to increase the number of task slots before increasing
parallelism, right?

My flink version is 1.10.1 and my jdbc connection
is flink-connector-jdbc_2.11:1.11.0.

Thanks!
Li


Flink 1.10.1 not using FLINK_TM_HEAP for TaskManager JVM Heap size correctly?

2020-06-12 Thread Li Peng
Hey folks, we recently migrated from Flink 1.9.x to 1.10.1, and we noticed
some wonky behavior in how JVM is configured:

1. We Add FLINK_JM_HEAP=5000m and FLINK_TM_HEAP=1400m variables to the
environment
2. The JobManager allocates the right heap size as expected
3. However, the TaskManager (started via taskmanager.sh), logs this instead:

 - 'taskmanager.memory.flink.size' is not specified, use the *configured
> deprecated task manager heap value (1.367gb (1468006400 bytes)) for it.*
>  - The derived from fraction jvm overhead memory (184.000mb (192937987
> bytes)) is less than its min value 192.000mb (201326592 bytes), min value
> will be used instead
> BASH_JAVA_UTILS_EXEC_RESULT:*-Xmx599785462 -Xms599785462*
> -XX:MaxDirectMemorySize=281018370 -XX:MaxMetaspaceSize=268435456


So the logs say it will use the configured 1400m as expected, but for some
reason it picks 599785462 as the heap size instead (TaskManagerRunner logs
that Maximum heap size is 572 MiBytes, so it's verified that the 1400m
value is not used)?

Anyone know if I'm missing a setting here or something?

Thanks,
Li


Re: Task-manager kubernetes pods take a long time to terminate

2020-02-04 Thread Li Peng
My yml files follow most of the instructions here:

http://shzhangji.com/blog/2019/08/24/deploy-flink-job-cluster-on-kubernetes/

What command did you use to delete the deployments? I use : helm
--tiller-namespace prod delete --purge my-deployment

I noticed that for environments without much data (like staging), this
works flawlessly, but in production with high volume of data, it gets stuck
in a loop. I suspect that the extra time needed to cleanup the task
managers with high traffic, delays the shutdown until after the job manager
terminates, and then the task manager gets stuck in a loop when it detects
the job manager is dead.

Thanks,
Li

>


Re: Task-manager kubernetes pods take a long time to terminate

2020-02-04 Thread Li Peng
Hey Yang,

The jobmanager and taskmanagers are all part of the same deployment, when I
delete the deployment all the pods are told to be terminated.

The status of the taskmanager is "terminating", and it waits until the
taskmanager times out in that error loop before it actually terminates.

Thanks,
Li

On Thu, Jan 30, 2020 at 6:22 PM Yang Wang  wrote:

> I think if you want to delete your Flink cluster on K8s, then you need to
> directly delete all the
> created deployments(jobmanager deploy, taskmanager deploy). For the
> configmap and service,
> you could leave them there if you want to reuse them by the next Flink
> cluster deploy.
>
> What's the status of taskmanager pod when you delete it and get stuck?
>
>
> Best,
> Yang
>
> Li Peng  于2020年1月31日周五 上午4:51写道:
>
>> Hi Yun,
>>
>> I'm currently specifying that specific RPC address in my kubernetes
>> charts for conveniene, should I be generating a new one for every
>> deployment?
>>
>> And yes, I am deleting the pods using those commands, I'm just noticing
>> that the task-manager termination process is short circuited by the
>> registration timeout check, so that instead of terminating quickly, the
>> task-manger would wait for 5 minutes to timeout before terminating. I'm
>> expecting it to just terminate without doing that registration timeout, is
>> there a way to configure that?
>>
>> Thanks,
>> Li
>>
>>
>> On Thu, Jan 30, 2020 at 8:53 AM Yun Tang  wrote:
>>
>>> Hi Li
>>>
>>> Why you still use ’job-manager' as thejobmanager.rpc.address for the
>>> second new cluster? If you use another rpc address, previous task managers
>>> would not try to register with old one.
>>>
>>> Take flink documentation [1] for k8s as example. You can list/delete all
>>> pods like:
>>>
>>> kubectl get/delete pods -l app=flink
>>>
>>>
>>> By the way, the default registration timeout is 5min [2], those
>>> taskmanager could not register to the JM will suicide after 5 minutes.
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/kubernetes.html#session-cluster-resource-definitions
>>> [2]
>>> https://github.com/apache/flink/blob/7e1a0f446e018681cb537dd936ae54388b5a7523/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java#L158
>>>
>>> Best
>>> Yun Tang
>>>
>>> --
>>> *From:* Li Peng 
>>> *Sent:* Thursday, January 30, 2020 9:24
>>> *To:* user 
>>> *Subject:* Task-manager kubernetes pods take a long time to terminate
>>>
>>> Hey folks, I'm deploying a Flink cluster via kubernetes, and starting
>>> each task manager with taskmanager.sh. I noticed that when I tell kubectl
>>> to delete the deployment, the job-manager pod usually terminates very
>>> quickly, but any task-manager that doesn't get terminated before the
>>> job-manager, usually gets stuck in this loop:
>>>
>>> 2020-01-29 09:18:47,867 INFO
>>>  org.apache.flink.runtime.taskexecutor.TaskExecutor- Could not
>>> resolve ResourceManager address 
>>> akka.tcp://flink@job-manager:6123/user/resourcemanager,
>>> retrying in 1 ms: Could not connect to rpc endpoint under address
>>> akka.tcp://flink@job-manager:6123/user/resourcemanager
>>>
>>> It then does this for about 10 minutes(?), and then shuts down. If I'm
>>> deploying a new cluster, this pod will try to register itself with the new
>>> job manager before terminating lter. This isn't a troubling issue as far as
>>> I can tell, but I find it annoying that I sometimes have to force delete
>>> the pods.
>>>
>>> Any easy ways to just have the task managers terminate gracefully and
>>> quickly?
>>>
>>> Thanks,
>>> Li
>>>
>>


Re: Task-manager kubernetes pods take a long time to terminate

2020-01-30 Thread Li Peng
Hi Yun,

I'm currently specifying that specific RPC address in my kubernetes charts
for conveniene, should I be generating a new one for every deployment?

And yes, I am deleting the pods using those commands, I'm just noticing
that the task-manager termination process is short circuited by the
registration timeout check, so that instead of terminating quickly, the
task-manger would wait for 5 minutes to timeout before terminating. I'm
expecting it to just terminate without doing that registration timeout, is
there a way to configure that?

Thanks,
Li


On Thu, Jan 30, 2020 at 8:53 AM Yun Tang  wrote:

> Hi Li
>
> Why you still use ’job-manager' as thejobmanager.rpc.address for the
> second new cluster? If you use another rpc address, previous task managers
> would not try to register with old one.
>
> Take flink documentation [1] for k8s as example. You can list/delete all
> pods like:
>
> kubectl get/delete pods -l app=flink
>
>
> By the way, the default registration timeout is 5min [2], those
> taskmanager could not register to the JM will suicide after 5 minutes.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/kubernetes.html#session-cluster-resource-definitions
> [2]
> https://github.com/apache/flink/blob/7e1a0f446e018681cb537dd936ae54388b5a7523/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java#L158
>
> Best
> Yun Tang
>
> --
> *From:* Li Peng 
> *Sent:* Thursday, January 30, 2020 9:24
> *To:* user 
> *Subject:* Task-manager kubernetes pods take a long time to terminate
>
> Hey folks, I'm deploying a Flink cluster via kubernetes, and starting each
> task manager with taskmanager.sh. I noticed that when I tell kubectl to
> delete the deployment, the job-manager pod usually terminates very quickly,
> but any task-manager that doesn't get terminated before the job-manager,
> usually gets stuck in this loop:
>
> 2020-01-29 09:18:47,867 INFO
>  org.apache.flink.runtime.taskexecutor.TaskExecutor- Could not
> resolve ResourceManager address 
> akka.tcp://flink@job-manager:6123/user/resourcemanager,
> retrying in 1 ms: Could not connect to rpc endpoint under address
> akka.tcp://flink@job-manager:6123/user/resourcemanager
>
> It then does this for about 10 minutes(?), and then shuts down. If I'm
> deploying a new cluster, this pod will try to register itself with the new
> job manager before terminating lter. This isn't a troubling issue as far as
> I can tell, but I find it annoying that I sometimes have to force delete
> the pods.
>
> Any easy ways to just have the task managers terminate gracefully and
> quickly?
>
> Thanks,
> Li
>


Task-manager kubernetes pods take a long time to terminate

2020-01-29 Thread Li Peng
Hey folks, I'm deploying a Flink cluster via kubernetes, and starting each
task manager with taskmanager.sh. I noticed that when I tell kubectl to
delete the deployment, the job-manager pod usually terminates very quickly,
but any task-manager that doesn't get terminated before the job-manager,
usually gets stuck in this loop:

2020-01-29 09:18:47,867 INFO
 org.apache.flink.runtime.taskexecutor.TaskExecutor- Could not
resolve ResourceManager address
akka.tcp://flink@job-manager:6123/user/resourcemanager,
retrying in 1 ms: Could not connect to rpc endpoint under address
akka.tcp://flink@job-manager:6123/user/resourcemanager

It then does this for about 10 minutes(?), and then shuts down. If I'm
deploying a new cluster, this pod will try to register itself with the new
job manager before terminating lter. This isn't a troubling issue as far as
I can tell, but I find it annoying that I sometimes have to force delete
the pods.

Any easy ways to just have the task managers terminate gracefully and
quickly?

Thanks,
Li


Re: Best way set max heap size via env variables or program arguments?

2020-01-02 Thread Li Peng
Awesome, thanks!

On Wed, Jan 1, 2020 at 6:17 PM Xintong Song  wrote:

> Hi Li,
>
> Regarding your questions:
>
> 1. Is there actually a way to pass in the heap size via arguments to
>> taskmanager.sh? Is passing -Dtaskmanager.heap.size supposed to work?
>
>
> No, '-Dtaskmanager.heap.size' is not supposed to work. The '-D'
> configurations are only parsed after the JVM is started, while
> 'taskmanager.heap.size' will be used for starting the JVM.
>
> 2.  If not, is there a recommended way to set the heap size by
>> environment, like environmental variables?
>
>
> Yes, these is an equivalent environment variable FLINK_TM_HEAP.
>
>
> 3. Also, the maximum heap size logged and -Xms and -Xmx is always a little
>> smaller than the configured size (i.e. configuring 3000m results 2700m in
>> the jvm arguments, 1024m results in 922), why is that?
>
>
> The configuration key 'taskmanager.heap.size' is a bit inaccurate. The
> config option actually also account for some off-heap memory, such as
> network direct buffers and off-heap managed memory (if used). That's way
> you see the java heap size is always slightly smaller than the configured
> 'taskmanager.heap.size'.
>
>
> Thank you~
>
> Xintong Song
>
>
>
> On Wed, Jan 1, 2020 at 3:10 AM Li Peng  wrote:
>
>> Hey folks, we've been running a k8 flink application, using the
>> taskmanager.sh script and passing in the -Djobmanager.heap.size=9000m and
>> -Dtaskmanager.heap.size=7000m as options to the script. I noticed from the
>> logs, that the Maximum heap size logged completely ignores these arguments,
>> and just sets the heap to the default of 922M.
>>
>> I tested setting the taskmanager.heap.size and jobmanager.heap.size
>> manually in flink-conf.yaml, and it does work as expected (minus the heap
>> being set a little lower than configured). But since we want the
>> application to pick up different memory settings based on the environment
>> (local/staging/prod/etc), setting it in flink-conf isn't ideal.
>>
>> So my questions are:
>>
>> 1. Is there actually a way to pass in the heap size via arguments to
>> taskmanager.sh? Is passing -Dtaskmanager.heap.size supposed to work?
>> 2.  If not, is there a recommended way to set the heap size by
>> environment, like environmental variables?
>> 3. Also, the maximum heap size logged and -Xms and -Xmx is always a
>> little smaller than the configured size (i.e. configuring 3000m results
>> 2700m in the jvm arguments, 1024m results in 922), why is that?
>>
>> Thanks, and happy new year!
>> Li
>>
>>


Best way set max heap size via env variables or program arguments?

2019-12-31 Thread Li Peng
Hey folks, we've been running a k8 flink application, using the
taskmanager.sh script and passing in the -Djobmanager.heap.size=9000m and
-Dtaskmanager.heap.size=7000m as options to the script. I noticed from the
logs, that the Maximum heap size logged completely ignores these arguments,
and just sets the heap to the default of 922M.

I tested setting the taskmanager.heap.size and jobmanager.heap.size
manually in flink-conf.yaml, and it does work as expected (minus the heap
being set a little lower than configured). But since we want the
application to pick up different memory settings based on the environment
(local/staging/prod/etc), setting it in flink-conf isn't ideal.

So my questions are:

1. Is there actually a way to pass in the heap size via arguments to
taskmanager.sh? Is passing -Dtaskmanager.heap.size supposed to work?
2.  If not, is there a recommended way to set the heap size by environment,
like environmental variables?
3. Also, the maximum heap size logged and -Xms and -Xmx is always a little
smaller than the configured size (i.e. configuring 3000m results 2700m in
the jvm arguments, 1024m results in 922), why is that?

Thanks, and happy new year!
Li


Re: Flink on Kubernetes seems to ignore log4j.properties

2019-12-12 Thread Li Peng
Ok I think I identified the issue:

1. I accidentally bundled another version of slf4j in my job jar, which
results in some incompatibility with the slf4j jar bundled with flink/bin.
Apparently slf4j in this case defaults to something that ignores the conf?
Once I removed slf4j from my job jar, the logger properties were properly
consumed.
2. Looks like the line log4j.appender.file.file=${log.file} on the default
properties didn't work properly (resulting in log4j null errors), it
started working after I just set it manually to opt/flink/logs/output.log.

Thanks for your guidance!
Li

On Thu, Dec 12, 2019 at 12:09 PM Li Peng  wrote:

> Hey ouywl, interesting, I figured something like that would happen. I
> actually replaced all the log4j-x files with the same config I originally
> posted, including log4j-console, but that didn't change the behavior either.
>
> Hey Yang, yes I verified the properties files are as I configured, and
> that the logs don't match up with it. Here are the JVM arguments, if that's
> what you were looking for:
>
> [main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -  JVM:
> OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.232-b09
> [main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
>  Maximum heap size: 989 MiBytes
> [main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
>  JAVA_HOME: /usr/local/openjdk-8
> [main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
>  Hadoop version: 2.8.3
> [main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -  JVM
> Options:
> [main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
> -Xms1024m
> [main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
> -Xmx1024m
> [main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
> -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties
> [main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
> -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml
> [main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
>  Program Arguments:
> [main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
> --configDir
> [main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
> /opt/flink/conf
> [main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
> -Djobmanager.rpc.address=myjob-manager
> [main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
> -Dparallelism.default=2
> [main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
> -Dblob.server.port=6124
> [main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
> -Dqueryable-state.server.ports=6125
> [main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
> -Djobmanager.heap.size=3000m
> [main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
> -Dtaskmanager.heap.size=3000m
> [main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
> -Dmetrics.reporter.stsd.class=org.apache.flink.metrics.statsd.StatsDReporter
> [main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
> -Dmetrics.reporter.stsd.host=myhost.com
> [main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
> -Dmetrics.reporter.stsd.port=8125
> [main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
> -Dmetrics.system-resource=true
>
> Thanks,
> Li
>
> On Thu, Dec 12, 2019 at 4:40 AM ouywl  wrote:
>
>>  @Li Peng
>>I found your problems.  Your start cmd use args “start-foreground”,
>> It will run “exec "${FLINK_BIN_DIR}"/flink-console.sh ${ENTRY_POINT_NAME}
>> "${ARGS[@]}””, and In ' flink-console.sh’, the code is “log_setting=(
>> "-Dlog4j.configuration=file:${FLINK_CONF_DIR}/log4j-console.properties"
>> "-Dlogback.configurationFile=file:${FLINK_CONF_DIR}/logback-console.xml")”
>> . So the log4j.properties not work. It need log4j-console.properties and
>> logback-console.xml.
>>
>> ouywl
>> ou...@139.com
>>
>> <https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1=ouywl=ouywl%40139.com=https%3A%2F%2Fmail-online.nosdn.127.net%2Fsma8dc7719018ba2517da7111b3db5a170.jpg=%5B%22ouywl%40139.com%22%5D>
>> 签名由 网易邮箱大师 <https://mail.163.com/dashi/dlpro.html?from=mail81> 定制
>>
>> On 12/12/2019 15:35,ouywl  wrote:
>>
>> HI yang,
>>Could you give more info detail? log4j.properties content, and The k8s
>> yaml. Is use the dockerfile in flink-container? When I test it use the
>> default per-job yaml in flick-container? It is only show logs in docker
>> infos. And not logs in /opt/flink/log.
>>
>> ouywl
>> ou...@139.com
>>
>

Re: Flink on Kubernetes seems to ignore log4j.properties

2019-12-12 Thread Li Peng
Hey ouywl, interesting, I figured something like that would happen. I
actually replaced all the log4j-x files with the same config I originally
posted, including log4j-console, but that didn't change the behavior either.

Hey Yang, yes I verified the properties files are as I configured, and that
the logs don't match up with it. Here are the JVM arguments, if that's what
you were looking for:

[main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -  JVM:
OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.232-b09
[main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
 Maximum heap size: 989 MiBytes
[main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
 JAVA_HOME: /usr/local/openjdk-8
[main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -  Hadoop
version: 2.8.3
[main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -  JVM
Options:
[main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
-Xms1024m
[main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
-Xmx1024m
[main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
-Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties
[main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
-Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml
[main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
 Program Arguments:
[main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
--configDir
[main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
/opt/flink/conf
[main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
-Djobmanager.rpc.address=myjob-manager
[main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
-Dparallelism.default=2
[main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
-Dblob.server.port=6124
[main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
-Dqueryable-state.server.ports=6125
[main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
-Djobmanager.heap.size=3000m
[main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
-Dtaskmanager.heap.size=3000m
[main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
-Dmetrics.reporter.stsd.class=org.apache.flink.metrics.statsd.StatsDReporter
[main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
-Dmetrics.reporter.stsd.host=myhost.com
[main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
-Dmetrics.reporter.stsd.port=8125
[main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
-Dmetrics.system-resource=true

Thanks,
Li

On Thu, Dec 12, 2019 at 4:40 AM ouywl  wrote:

>  @Li Peng
>I found your problems.  Your start cmd use args “start-foreground”, It
> will run “exec "${FLINK_BIN_DIR}"/flink-console.sh ${ENTRY_POINT_NAME} "$
> {ARGS[@]}””, and In ' flink-console.sh’, the code is “log_setting=(
> "-Dlog4j.configuration=file:${FLINK_CONF_DIR}/log4j-console.properties"
> "-Dlogback.configurationFile=file:${FLINK_CONF_DIR}/logback-console.xml")”
> . So the log4j.properties not work. It need log4j-console.properties and
> logback-console.xml.
>
> ouywl
> ou...@139.com
>
> <https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1=ouywl=ouywl%40139.com=https%3A%2F%2Fmail-online.nosdn.127.net%2Fsma8dc7719018ba2517da7111b3db5a170.jpg=%5B%22ouywl%40139.com%22%5D>
> 签名由 网易邮箱大师 <https://mail.163.com/dashi/dlpro.html?from=mail81> 定制
>
> On 12/12/2019 15:35,ouywl  wrote:
>
> HI yang,
>Could you give more info detail? log4j.properties content, and The k8s
> yaml. Is use the dockerfile in flink-container? When I test it use the
> default per-job yaml in flick-container? It is only show logs in docker
> infos. And not logs in /opt/flink/log.
>
> ouywl
> ou...@139.com
>
> <https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1=ouywl=ouywl%40139.com=https%3A%2F%2Fmail-online.nosdn.127.net%2Fsma8dc7719018ba2517da7111b3db5a170.jpg=%5B%22ouywl%40139.com%22%5D>
> 签名由 网易邮箱大师 <https://mail.163.com/dashi/dlpro.html?from=mail81> 定制
>
> On 12/12/2019 13:47,Yang Wang
>  wrote:
>
> Hi Peng,
>
> What i mean is to use `docker exec` into the running pod and `ps` to get
> the real
> command that is running for jobmanager.
> Do you have checked the /opt/flink/conf/log4j.properties is right?
>
> I have tested standalone per-job on my kubernetes cluster, the logs show
> up as expected.
>
>
> Best,
> Yang
>
> Li Peng  于2019年12月12日周四 上午2:59写道:
>
>> Hey Yang, here are the commands:
>>
>> "/opt/flink/bin/taskmanager.sh",
>> "start-foreground",
>> "-Djobmanager.rpc.address={{ .Chart.Name }}-job-manager",
>> "-Dtaskmanager.numberOfTaskSlots=1&quo

Re: Flink on Kubernetes seems to ignore log4j.properties

2019-12-11 Thread Li Peng
Hey Yang, here are the commands:

"/opt/flink/bin/taskmanager.sh",
"start-foreground",
"-Djobmanager.rpc.address={{ .Chart.Name }}-job-manager",
"-Dtaskmanager.numberOfTaskSlots=1"

"/opt/flink/bin/standalone-job.sh",
"start-foreground",
"-Djobmanager.rpc.address={{ .Chart.Name }}-job-manager",
"-Dparallelism.default={{ .Values.task.replicaCount }}"

Yes it's very curious that I don't see any logs actually written to
/opt/flink/log.

On Tue, Dec 10, 2019 at 11:17 PM Yang Wang  wrote:

> Could you find the logs under /opt/flink/log/jobmanager.log? If not,
> please share the
> commands the JobManager and TaskManager are using? If the command is
> correct
> and the log4j under /opt/flink/conf is expected, it is so curious why we
> could not get the logs.
>
>
> Best,
> Yang
>
> Li Peng  于2019年12月11日周三 下午1:24写道:
>
>> Ah I see. I think the Flink app is reading files from
>> /opt/flink/conf correctly as it is, since changes I make to flink-conf are
>> picked up as expected, it's just the log4j properties that are either not
>> being used, or don't apply to stdout or whatever source k8 uses for its
>> logs? Given that the pods don't seem to have logs written to file
>> anywhere, contrary to the properties, I'm inclined to say it's the former
>> and that the log4j properties just aren't being picked up. Still have no
>> idea why though.
>>
>> On Tue, Dec 10, 2019 at 6:56 PM Yun Tang  wrote:
>>
>>> Sure, /opt/flink/conf is mounted as a volume from the configmap.
>>>
>>>
>>>
>>> Best
>>>
>>> Yun Tang
>>>
>>>
>>>
>>> *From: *Li Peng 
>>> *Date: *Wednesday, December 11, 2019 at 9:37 AM
>>> *To: *Yang Wang 
>>> *Cc: *vino yang , user 
>>> *Subject: *Re: Flink on Kubernetes seems to ignore log4j.properties
>>>
>>>
>>>
>>> 1. Hey Yun, I'm calling /opt/flink/bin/standalone-job.sh and
>>> /opt/flink/bin/taskmanager.sh on my job and task managers respectively.
>>> It's based on the setup described here:
>>> http://shzhangji.com/blog/2019/08/24/deploy-flink-job-cluster-on-kubernetes/
>>>  .
>>> I haven't tried the configmap approach yet, does it also replace the conf
>>> files in /opt/flink/conf?
>>>
>>> 2. Hey Vino, here's a sample of the kubernetes:
>>> https://pastebin.com/fqJrgjZu  I didn't change any patterns from the
>>> default, so the string patterns should look the same, but as you can see
>>> it's full of info checkpoint logs that I originally was trying to suppress.
>>> Based on my log4j.properties, the level should be set to WARN. I couldn't
>>> actually find any .out files on the pod, this is from the kubectl logs
>>> command. I also didn't see any files in /opt/flink/log, which I thought my
>>> log4j was specified to do, hence me thinking that the properties weren't
>>> actually being consumed. I also have the same properties in my
>>> src/main/resources folder.
>>>
>>> 3. Hey Yang, yes this is a standalone session cluster. I did specify in
>>> the docker file to copy the log4j.properties to the /opt/flink/conf folder
>>> on the image, and I confirmed that the properties are correct when I bash'd
>>> into the pod and viewed them manually.
>>>
>>>
>>>
>>> Incidentally, I also tried passing the -Dlog4j.configuration argument to
>>> the programs, and it doesn't work either. And based on what I'm reading on
>>> jira, that option is not really supported anymore?
>>>
>>>
>>>
>>> Thanks for your responses, folks!
>>>
>>> Li
>>>
>>>
>>>
>>> On Mon, Dec 9, 2019 at 7:10 PM Yang Wang  wrote:
>>>
>>> Hi Li Peng,
>>>
>>>
>>>
>>> You are running standalone session cluster or per-job cluster on
>>> kubernetes. Right?
>>>
>>> If so, i think you need to check your log4j.properties in the image, not
>>> local. The log is
>>>
>>> stored to /opt/flink/log/jobmanager.log by default.
>>>
>>>
>>>
>>> If you are running active Kubernetes integration for a fresh taste. The
>>> following cli option
>>>
>>> could be used to remove the redirect.
>>>
>>> -Dkubernetes.container-start-command-template="%java% %classpath%
>>> %jvmmem% %jvmopts% %logging% %class% %args%"
>>>
>>>
>>>
>>> Best,
>>>
>>>

Re: Flink on Kubernetes seems to ignore log4j.properties

2019-12-10 Thread Li Peng
Ah I see. I think the Flink app is reading files from
/opt/flink/conf correctly as it is, since changes I make to flink-conf are
picked up as expected, it's just the log4j properties that are either not
being used, or don't apply to stdout or whatever source k8 uses for its
logs? Given that the pods don't seem to have logs written to file
anywhere, contrary to the properties, I'm inclined to say it's the former
and that the log4j properties just aren't being picked up. Still have no
idea why though.

On Tue, Dec 10, 2019 at 6:56 PM Yun Tang  wrote:

> Sure, /opt/flink/conf is mounted as a volume from the configmap.
>
>
>
> Best
>
> Yun Tang
>
>
>
> *From: *Li Peng 
> *Date: *Wednesday, December 11, 2019 at 9:37 AM
> *To: *Yang Wang 
> *Cc: *vino yang , user 
> *Subject: *Re: Flink on Kubernetes seems to ignore log4j.properties
>
>
>
> 1. Hey Yun, I'm calling /opt/flink/bin/standalone-job.sh and
> /opt/flink/bin/taskmanager.sh on my job and task managers respectively.
> It's based on the setup described here:
> http://shzhangji.com/blog/2019/08/24/deploy-flink-job-cluster-on-kubernetes/ .
> I haven't tried the configmap approach yet, does it also replace the conf
> files in /opt/flink/conf?
>
> 2. Hey Vino, here's a sample of the kubernetes:
> https://pastebin.com/fqJrgjZu  I didn't change any patterns from the
> default, so the string patterns should look the same, but as you can see
> it's full of info checkpoint logs that I originally was trying to suppress.
> Based on my log4j.properties, the level should be set to WARN. I couldn't
> actually find any .out files on the pod, this is from the kubectl logs
> command. I also didn't see any files in /opt/flink/log, which I thought my
> log4j was specified to do, hence me thinking that the properties weren't
> actually being consumed. I also have the same properties in my
> src/main/resources folder.
>
> 3. Hey Yang, yes this is a standalone session cluster. I did specify in
> the docker file to copy the log4j.properties to the /opt/flink/conf folder
> on the image, and I confirmed that the properties are correct when I bash'd
> into the pod and viewed them manually.
>
>
>
> Incidentally, I also tried passing the -Dlog4j.configuration argument to
> the programs, and it doesn't work either. And based on what I'm reading on
> jira, that option is not really supported anymore?
>
>
>
> Thanks for your responses, folks!
>
> Li
>
>
>
> On Mon, Dec 9, 2019 at 7:10 PM Yang Wang  wrote:
>
> Hi Li Peng,
>
>
>
> You are running standalone session cluster or per-job cluster on
> kubernetes. Right?
>
> If so, i think you need to check your log4j.properties in the image, not
> local. The log is
>
> stored to /opt/flink/log/jobmanager.log by default.
>
>
>
> If you are running active Kubernetes integration for a fresh taste. The
> following cli option
>
> could be used to remove the redirect.
>
> -Dkubernetes.container-start-command-template="%java% %classpath% %jvmmem%
> %jvmopts% %logging% %class% %args%"
>
>
>
> Best,
>
> Yang
>
>
>
> vino yang  于2019年12月10日周二 上午10:55写道:
>
> Hi Li,
>
>
>
> A potential reason could be conflicting logging frameworks. Can you share
> the log in your .out file and let us know if the print format of the log is
> the same as the configuration file you gave.
>
>
>
> Best,
>
> Vino
>
>
>
> Li Peng  于2019年12月10日周二 上午10:09写道:
>
> Hey folks, I noticed that my kubernetes flink logs (reached via *kubectl
> logs *) completely ignore any of the configurations I put into
> /flink/conf/. I set the logger level to WARN, yet I still see INFO level
> logging from flink loggers
> like org.apache.flink.runtime.checkpoint.CheckpointCoordinator. I even made
> copied the same properties to /flink/conf/log4j-console.properties
> and log4j-cli.properties.
>
>
>
> From what I can tell, kubernetes just listens to stdout and stderr, so
> shouldn't the log4j.properties control output to them? Anyone seen this
> issue before?
>
>
>
> Here is my log4j.properties:
>
>
> # This affects logging for both user code and Flink
> log4j.rootLogger=WARN, file, console, stdout
>
> # Uncomment this if you want to _only_ change Flink's logging
> log4j.logger.org.apache.flink=WARN
>
> # The following lines keep the log level of common libraries/connectors on
> # log level INFO. The root logger does not override this. You have to manually
> # change the log levels here.
> log4j.logger.akka=INFO
> log4j.logger.org.apache.kafka=INFO
> log4j.logger.org.apache.hadoop=INFO
> log4j.logger.org.apache.zookeeper=INFO
>
> # Log all infos in the given file
> lo

Re: Flink on Kubernetes seems to ignore log4j.properties

2019-12-10 Thread Li Peng
1. Hey Yun, I'm calling /opt/flink/bin/standalone-job.sh and
/opt/flink/bin/taskmanager.sh on my job and task managers respectively.
It's based on the setup described here:
http://shzhangji.com/blog/2019/08/24/deploy-flink-job-cluster-on-kubernetes/ .
I haven't tried the configmap approach yet, does it also replace the conf
files in /opt/flink/conf?

2. Hey Vino, here's a sample of the kubernetes:
https://pastebin.com/fqJrgjZu  I didn't change any patterns from the
default, so the string patterns should look the same, but as you can see
it's full of info checkpoint logs that I originally was trying to suppress.
Based on my log4j.properties, the level should be set to WARN. I couldn't
actually find any .out files on the pod, this is from the kubectl logs
command. I also didn't see any files in /opt/flink/log, which I thought my
log4j was specified to do, hence me thinking that the properties weren't
actually being consumed. I also have the same properties in my
src/main/resources folder.

3. Hey Yang, yes this is a standalone session cluster. I did specify in the
docker file to copy the log4j.properties to the /opt/flink/conf folder on
the image, and I confirmed that the properties are correct when I bash'd
into the pod and viewed them manually.

Incidentally, I also tried passing the -Dlog4j.configuration argument to
the programs, and it doesn't work either. And based on what I'm reading on
jira, that option is not really supported anymore?

Thanks for your responses, folks!
Li

On Mon, Dec 9, 2019 at 7:10 PM Yang Wang  wrote:

> Hi Li Peng,
>
> You are running standalone session cluster or per-job cluster on
> kubernetes. Right?
> If so, i think you need to check your log4j.properties in the image, not
> local. The log is
> stored to /opt/flink/log/jobmanager.log by default.
>
> If you are running active Kubernetes integration for a fresh taste. The
> following cli option
> could be used to remove the redirect.
> -Dkubernetes.container-start-command-template="%java% %classpath% %jvmmem%
> %jvmopts% %logging% %class% %args%"
>
> Best,
> Yang
>
> vino yang  于2019年12月10日周二 上午10:55写道:
>
>> Hi Li,
>>
>> A potential reason could be conflicting logging frameworks. Can you share
>> the log in your .out file and let us know if the print format of the log is
>> the same as the configuration file you gave.
>>
>> Best,
>> Vino
>>
>> Li Peng  于2019年12月10日周二 上午10:09写道:
>>
>>> Hey folks, I noticed that my kubernetes flink logs (reached via *kubectl
>>> logs *) completely ignore any of the configurations I put
>>> into /flink/conf/. I set the logger level to WARN, yet I still see INFO
>>> level logging from flink loggers
>>> like org.apache.flink.runtime.checkpoint.CheckpointCoordinator. I even made
>>> copied the same properties to /flink/conf/log4j-console.properties
>>> and log4j-cli.properties.
>>>
>>> From what I can tell, kubernetes just listens to stdout and stderr, so
>>> shouldn't the log4j.properties control output to them? Anyone seen this
>>> issue before?
>>>
>>> Here is my log4j.properties:
>>>
>>>
>>> # This affects logging for both user code and Flink
>>> log4j.rootLogger=WARN, file, console, stdout
>>>
>>> # Uncomment this if you want to _only_ change Flink's logging
>>> log4j.logger.org.apache.flink=WARN
>>>
>>> # The following lines keep the log level of common libraries/connectors on
>>> # log level INFO. The root logger does not override this. You have to 
>>> manually
>>> # change the log levels here.
>>> log4j.logger.akka=INFO
>>> log4j.logger.org.apache.kafka=INFO
>>> log4j.logger.org.apache.hadoop=INFO
>>> log4j.logger.org.apache.zookeeper=INFO
>>>
>>> # Log all infos in the given file
>>> log4j.appender.file=org.apache.log4j.FileAppender
>>> log4j.appender.file.file=${log.file}
>>> log4j.appender.file.append=false
>>> log4j.appender.file.layout=org.apache.log4j.PatternLayout
>>> log4j.appender.file.layout.ConversionPattern=%d{-MM-dd HH:mm:ss,SSS} 
>>> %-5p %-60c %x - %m%n
>>>
>>> # Log all infos to the console
>>> log4j.appender.console=org.apache.log4j.ConsoleAppender
>>> log4j.appender.console.layout=org.apache.log4j.PatternLayout
>>> log4j.appender.console.layout.ConversionPattern=%d{-MM-dd HH:mm:ss,SSS} 
>>> %-5p %-60c %x - %m%n
>>>
>>> # Suppress the irrelevant (wrong) warnings from the Netty channel handler
>>> log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR,
>>>  file, console
>>> log4j.logger.org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction=WARN
>>> log4j.logger.org.apache.flink.runtime.checkpoint=WARN
>>>
>>> Thanks,
>>> Li
>>>
>>


Re: StreamingFileSink doesn't close multipart uploads to s3?

2019-12-06 Thread Li Peng
Ok I seem to have solved the issue by enabling checkpointing. Based on the
docs
<https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html>
(I'm
using 1.9.0), it seemed like only StreamingFileSink.forBulkFormat()
should've required checkpointing, but based on this
experience, StreamingFileSink.forRowFormat() requires it too! Is this the
intended behavior? If so, the docs should probably be updated.

Thanks,
Li

On Fri, Dec 6, 2019 at 2:01 PM Li Peng  wrote:

> Hey folks, I'm trying to get StreamingFileSink to write to s3 every
> minute, with flink-s3-fs-hadoop, and based on the default rolling policy,
> which is configured to "roll" every 60 seconds, I thought that would be
> automatic (I interpreted rolling to mean actually close a multipart upload
> to s3).
>
> But I'm not actually seeing files written to s3 at all, instead I see a
> bunch of open multipart uploads when I check the AWS s3 console, for
> example:
>
>  "Uploads": [
> {
> "Initiated": "2019-12-06T20:57:47.000Z",
> "Key": "2019-12-06--20/part-0-0"
> },
> {
> "Initiated": "2019-12-06T20:57:47.000Z",
> "Key": "2019-12-06--20/part-1-0"
> },
> {
> "Initiated": "2019-12-06T21:03:12.000Z",
> "Key": "2019-12-06--21/part-0-1"
> },
> {
> "Initiated": "2019-12-06T21:04:15.000Z",
> "Key": "2019-12-06--21/part-0-2"
> },
> {
> "Initiated": "2019-12-06T21:22:23.000Z"
> "Key": "2019-12-06--21/part-0-3"
> }
> ]
>
> And these uploads are being open for a long time. So far after an hour,
> none of the uploads have been closed. Is this the expected behavior? If I
> wanted to get these uploads to actually write to s3 quickly, do I need to
> configure the hadoop stuff to get that done, like setting a smaller
> buffer/partition size to force it to upload
> <https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html#How_S3A_writes_data_to_S3>
> ?
>
> Thanks,
> Li
>


StreamingFileSink doesn't close multipart uploads to s3?

2019-12-06 Thread Li Peng
Hey folks, I'm trying to get StreamingFileSink to write to s3 every minute,
with flink-s3-fs-hadoop, and based on the default rolling policy, which is
configured to "roll" every 60 seconds, I thought that would be automatic (I
interpreted rolling to mean actually close a multipart upload to s3).

But I'm not actually seeing files written to s3 at all, instead I see a
bunch of open multipart uploads when I check the AWS s3 console, for
example:

 "Uploads": [
{
"Initiated": "2019-12-06T20:57:47.000Z",
"Key": "2019-12-06--20/part-0-0"
},
{
"Initiated": "2019-12-06T20:57:47.000Z",
"Key": "2019-12-06--20/part-1-0"
},
{
"Initiated": "2019-12-06T21:03:12.000Z",
"Key": "2019-12-06--21/part-0-1"
},
{
"Initiated": "2019-12-06T21:04:15.000Z",
"Key": "2019-12-06--21/part-0-2"
},
{
"Initiated": "2019-12-06T21:22:23.000Z"
"Key": "2019-12-06--21/part-0-3"
}
]

And these uploads are being open for a long time. So far after an hour,
none of the uploads have been closed. Is this the expected behavior? If I
wanted to get these uploads to actually write to s3 quickly, do I need to
configure the hadoop stuff to get that done, like setting a smaller
buffer/partition size to force it to upload

?

Thanks,
Li


Re: What S3 Permissions does StreamingFileSink need?

2019-12-06 Thread Li Peng
Ah, I figured it out after all, turns out it was due to KMS encryption on
the bucket; needed to add KMS permissions for the IAM role, otherwise there
is an unauthorized error. Thanks for your help!

On Fri, Dec 6, 2019 at 2:34 AM Khachatryan Roman <
khachatryan.ro...@gmail.com> wrote:

> Hey Li,
>
> > my permissions is as listed above
> As I understand it, it's a terraform script above. But what are the actual
> permissions in AWS?
> And it also makes sense to make sure that they are associated with the
> right role and role with user.
>
> > Maybe I need to add the directory level as a resource?
> You don't have to.
>
> If it's possible in your setup, you can debug by granting all s3
> permissions to all objects, like this:
> actions   = ["s3:*"]
> resources = ["*"]
>
> Regards,
> Roman
>
>
> On Fri, Dec 6, 2019 at 12:15 AM Li Peng  wrote:
>
>> Hey Roman, my permissions is as listed above, and here's the
>> error message I get:
>>
>> ava.lang.Exception:
>> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
>> Could not forward element to next operator
>> at
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:212)
>> at
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.performDefaultAction(SourceStreamTask.java:132)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by:
>> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
>> Could not forward element to next operator
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>> at
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
>> at
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
>> at
>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
>> at
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
>> at
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
>> at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>> at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>> at
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:202)
>> Caused by:
>> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
>> Could not forward element to next operator
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:707)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:660)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(

Re: What S3 Permissions does StreamingFileSink need?

2019-12-05 Thread Li Peng
)
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256)
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:231)
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.retry(WriteOperationHelper.java:123)
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.initiateMultiPartUpload(WriteOperationHelper.java:198)
at
org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.startMultiPartUpload(HadoopS3AccessHelper.java:66)
at
org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.newUpload(RecoverableMultiPartUploadImpl.java:245)
at
org.apache.flink.fs.s3.common.writer.S3RecoverableMultipartUploadFactory.getNewRecoverableUpload(S3RecoverableMultipartUploadFactory.java:68)
at
org.apache.flink.fs.s3.common.writer.S3RecoverableWriter.open(S3RecoverableWriter.java:76)
at
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:221)
at
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:212)
at
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:268)
at
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:370)
at
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
... 26 more
Caused by:
org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
Access Denied (Service: Amazon S3; Status Code: 403; Error Code:
AccessDenied; Request ID: ; S3 Extended Request ID: , S3 Extended
Request ID: 
at
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1639)
at
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1304)
at
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1056)
at
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
at
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
at
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
at
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
at
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
at
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
at
org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
at
org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
at
org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.initiateMultipartUpload(AmazonS3Client.java:3152)
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.initiateMultipartUpload(S3AFileSystem.java:2597)
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.lambda$initiateMultiPartUpload$0(WriteOperationHelper.java:199)
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
... 42 more

Maybe I need to add the directory level as a resource?

resources = [
 "arn:aws:s3:::bucket-name",
 "arn:aws:s3:::bucket-name/",
 "arn:aws:s3:::bucket-name/*"
]

Thanks,
Li

On Thu, Dec 5, 2019 at 6:11 AM r_khachatryan 
wrote:

> Hi Li,
>
> Could you please list the permissions you see and the error message you
> receive from AWS?
>
>
> Li Peng-2 wrote
> > Hey folks, I'm trying to use StreamingFileSink with s3, using IAM roles
> > for
> > auth. Does anyone know what permissions the role should have for the
> > specified s3 bucket to work properly? I've been getting some auth errors,
> > and I suspect I'm missing some permissions:
>
> Regards,
> Roman
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


What S3 Permissions does StreamingFileSink need?

2019-12-04 Thread Li Peng
Hey folks, I'm trying to use StreamingFileSink with s3, using IAM roles for
auth. Does anyone know what permissions the role should have for the
specified s3 bucket to work properly? I've been getting some auth errors,
and I suspect I'm missing some permissions:


data "aws_iam_policy_document" "s3_policy_document" {
  version = "2012-10-17"

  statement {
actions = [
  "s3:AbortMultipartUpload",
  "s3:CreateBucket",
  "s3:DeleteObject",
  "s3:Get*",
  "s3:List*",
  "s3:PutBucketVersioning",
  "s3:PutObject",
  "s3:PutObjectTagging"
]

resources = [
  "arn:aws:s3:::bucket-name",
  "arn:aws:s3:::bucket-name/*"
]
  }
}

Maybe the CreateBucket permission doesn't work for create buckets within
subbuckets?

Thanks,
Li


Streaming Files to S3

2019-11-25 Thread Li Peng
Hey folks, I'm trying to stream large volume data and write them as csv
files to S3, and one of the restrictions is to try and keep the files to
below 100MB (compressed) and write one file per minute. I wanted to verify
with you guys regarding my understanding of StreamingFileSink:

1. From the docs, StreamingFileSink will use multipart upload with s3, so
even with many workers writing to s3, it will still output only one file
for all of them for each time window, right?
2. StreamingFileSink.forRowFormat can be configured to write individual
rows and then commit to disk as per the above rules, by specifying a
RollingPolicy with the file size limit and the rollover interval, correct?
And the limit and the interval applies to the entire file, not to each part
file?
3. To write to s3, is it enough to just add flink-s3-fs-hadoop as a
dependency and specify the file path as "s3://file"?

Thanks,
Li


Re: Streaming data to Segment

2019-11-21 Thread Li Peng
Awesome, I'll definitely try that out, thanks!

On Wed, Nov 20, 2019 at 9:36 PM Yuval Itzchakov  wrote:

> Hi Li,
>
> You're in the right direction. One additional step would be to use
> RickSinkFunction[Data] instead of SinkFunction[Data] which exposes open and
> close functions which allow you to initialize and dispose resources
> properly.
>
> On Thu, 21 Nov 2019, 5:23 Li Peng,  wrote:
>
>> Hey folks, I'm interested in streaming some data to Segment
>> <https://segment.com/docs/sources/server/java/>, using their existing
>> java library. This is a pretty high throughput stream, so I wanted for each
>> parallel operator to have its own instance of the segment client. From what
>> I could tell, defining a custom SinkFunction should be able to satisfy as
>> it as each parallel operator gets its own SinkFunction object
>> automatically. So my code looks like this:
>>
>> class SegmentSink() extends SinkFunction[Data] {
>>
>>   @transient
>>   val segmentClient: Analytics = Analytics.builder("key").build()
>>
>>   override def invoke(value: Data, context: SinkFunction.Context[_]): Unit = 
>> {
>> segmentClient.enqueue(...)
>>   }
>> }
>>
>> Can anyone verify if this is the right pattern for me to use? Is there
>> any risk of the SinkFunction getting repeatedly serialized/deserialized
>> which results in new segment clients getting created each time?
>>
>> Thanks,
>> Li
>>
>


Streaming data to Segment

2019-11-20 Thread Li Peng
Hey folks, I'm interested in streaming some data to Segment
, using their existing java
library. This is a pretty high throughput stream, so I wanted for each
parallel operator to have its own instance of the segment client. From what
I could tell, defining a custom SinkFunction should be able to satisfy as
it as each parallel operator gets its own SinkFunction object
automatically. So my code looks like this:

class SegmentSink() extends SinkFunction[Data] {

  @transient
  val segmentClient: Analytics = Analytics.builder("key").build()

  override def invoke(value: Data, context: SinkFunction.Context[_]): Unit = {
segmentClient.enqueue(...)
  }
}

Can anyone verify if this is the right pattern for me to use? Is there any
risk of the SinkFunction getting repeatedly serialized/deserialized which
results in new segment clients getting created each time?

Thanks,
Li


Re: Connection refused error when writing to socket?

2017-01-31 Thread Li Peng
Yes I did open a socket with netcat. Turns out my first error was due
to a stream without a sink triggering the socket connect and (I
thought that without a sink the stream wouldn't affect anything so I
didn't comment it out, and I didn't open the socket for that port).
However

I did play with it some more and I think the real issue is that I'm
trying to have two streams, one write to a port and another read from
the same port. i.e.

val y = executionEnvironment.socketTextStream("localhost", 9000)
x.writeToSocket("localhost", 9000, new SimpleStringSchema())

Once I tested just write or just the read it worked, but combined I
get this error:

java.net.SocketException: Connection reset
at java.net.SocketInputStream.read(SocketInputStream.java:210)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
at java.io.InputStreamReader.read(InputStreamReader.java:184)
at java.io.BufferedReader.read1(BufferedReader.java:210)
at java.io.BufferedReader.read(BufferedReader.java:286)
at java.io.Reader.read(Reader.java:140)
at 
org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction.run(SocketTextStreamFunction.java:101)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:80)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:53)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:642)
at java.lang.Thread.run(Thread.java:745)

Is this operation not allowed?

And I'm mainly writing to the same socket in order to pass work back
and forth between streams.


Connection refused error when writing to socket?

2017-01-30 Thread Li Peng
Hi there,

I'm trying to test a couple of things by having my stream write to a
socket, but it keeps failing to connect (I'm trying to have a stream
write to a socket, and have another stream read from that socket).

Caused by: java.net.ConnectException: Connection refused (Connection refused)

at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at 
java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at 
org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction.run(SocketTextStreamFunction.java:96)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:80)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:53)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:642)
at java.lang.Thread.run(Thread.java:745)

I tried writeToSocket(parameterTool.get("localhost"),
parameterTool.getInt(9000), new SimpleStringSchema), and even a custom
sink:

addSink(w => {
  val ia = InetAddress.getByName("localhost")
  val socket = new Socket(ia, 9000)
  val outStream = socket.getOutputStream
  val out = new PrintWriter(new BufferedWriter(new
OutputStreamWriter(outStream)))
  out.println(w)
  out.flush()
  out.close()
})

But none of this seem to work. I'm fairly sure I setup the server
correctly since I can connect it to via a telnet and other dummy
echoclients I wrote. I can also have my data stream read from that
same socket without any issues, but I can't seem to tell my stream to
write to this socket without the above connection refused error
showing up. Is there some nuance here I'm missing?

Thanks!


Proper ways to write iterative DataSets with dependencies

2017-01-26 Thread Li Peng
Hi there, I just started investigating Flink and I'm curious if I'm
approaching my issue in the right way.

My current usecase is modeling a series of transformations, where I
start with some transformations, which when done can yield another
transformation, or a result to output to some sink, or a Join
operation that will extract data from some other data set and combine
it with existing data (and output a transformation that should be
processed like any other transform). The transformations and results
are easy to deal with, but joins are more troubling.

Here is my current solution that I got to work:

//initialSolution contains all external data to join on.
initialSolution.iterateDelta(initialWorkset, 1, Array("id")) {
  (solution: DataSet[Work[String]], workset: DataSet[Work[String]]) => {
//handle joins separately
val joined = handleJoins(solution, workset.filter(isJoin(_)))

   //transformations are handled separately as well
val transformed = handleTransformations(workset.filter(isTransform(_)))

val nextWorkSet = transformed.filter(isWork(_)).union(joined)
val solutionUpdate = transformed.filter(isResult(_))
(solutionUpdate, nextWorkSet)
  }
}

My questions are:

1. Is this the right way to use Flink? Based on the documentation
(correct me if I'm wrong) it seems that in the iterative case the
external data (to be used in the join) should be in the solution
DataSet, so if this usecase has multiple external data sources to join
on, they are all collected in the initial solution DataSet. Would
having all of this different data in the solution have bad
repercussions for partitioning/performance?
2. Doing the joins as part of the iteration seems a bit wrong to me (I
might just be thinking about the issue in the wrong way). I
alternatively tried to model this approach as a series of DataStreams,
where the code is pretty much the same as above, but where the
iteration occurs on stream T, which splits off to two streams J and R,
where R is just the result sink, and J has the logic that joins
incoming data, and after the join sends the result back to stream T.
But I didn't see a good way to say "send result of J back to T, and
run all the standard iterative logic on that" using the data stream
API. I could manually create some endpoints for these streams to hit
and thus achieve this behavior, but is there an easy way I'm missing
that can achieve this via the flink api?

Thanks,
Li