Re: Flink SQL + savepoint

2019-10-31 Thread Yun Tang
Hi Fanbin

If you do not change the parallelism or add and remove operators, you could 
still use savepoint to resume your jobs with Flink SQL.

However, as far as I know, Flink SQL might not configure the uid currently and 
I’m pretty sure blink branch contains this part of setting uid to stream node. 
[1]

Already CC Kurt as he could provide more detail information of this.

[1] 
https://github.com/apache/flink/blob/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/util/resource/StreamNodeUtil.java#L44

Best
Yun Tang


From: Fanbin Bu 
Date: Thursday, October 31, 2019 at 1:17 PM
To: user 
Subject: Flink SQL + savepoint

Hi,

it is highly recommended that we assign the uid to the operator for the sake of 
savepoint. How do we do this for Flink SQL? According to 
https://stackoverflow.com/questions/55464955/how-to-add-uid-to-operator-in-flink-table-api,
 it is not possible.

Does that mean, I can't use savepoint to restart my program if I use Flink SQL?

Thanks,

Fanbin


Re: Sending custom statsd tags

2019-10-31 Thread vino yang
Hi Prakhar,

You need to customize StatsDReporter[1] in the Flink source.

If you want to flexibly get configurable tags from the configuration
file[2], you can refer to the implementation of DatadogHttpReporter#open[3]
(for reference only how to get the tag).

Best,
Vino

[1]:
https://github.com/apache/flink/blob/master/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java#L52
[2]:
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#datadog-orgapacheflinkmetricsdatadogdatadoghttpreporter
[3]:
https://github.com/apache/flink/blob/master/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java#L114


Prakhar Mathur  于2019年10月31日周四 下午2:35写道:

> Hi Chesnay,
>
> Thanks for the response, can you point me to some existing example for
> this?
>
> On Wed, Oct 30, 2019 at 5:30 PM Chesnay Schepler 
> wrote:
>
>> Not possible, you'll have to extend the StatsDReporter yourself to add
>> arbitrary tags.
>>
>> On 30/10/2019 12:52, Prakhar Mathur wrote:
>>
>> Hi,
>>
>> We are running Flink 1.6.2. We are using flink-metrics-statsd jar in
>> order to send metrics to telegraf. In order to send custom metrics, we are
>> using MetricGroups. Currently, we are trying to send a few custom tags
>> but unable to find any examples or documentation regarding the same.
>>
>> Regards
>> Prakhar Mathur
>>
>>
>>


RemoteEnvironment cannot execute job from local.

2019-10-31 Thread Simon Su


Hi all 
   I want to test to submit a job from my local IDE and I deployed a Flink 
cluster in my vm. 
   Here is my code from Flink 1.9 document and add some of my parameters.
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment
.createRemoteEnvironment("localhost", 8081, "/tmp/myudf.jar");

DataSet data = env.readTextFile("/tmp/file");

data
.filter(new FilterFunction() {
public boolean filter(String value) {
return value.startsWith("http://";);
}
})
.writeAsText("/tmp/file1");

env.execute();
}

When I run the program, I raises the error like: 


Exception in thread "main" 
org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: 
1f32190552e955bb2048c31930edfb0e)
at 
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:262)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:326)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:301)
at 
org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:209)
at org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:186)
at 
org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:173)
at 
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:820)
at TestMain.main(TestMain.java:25)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution 
failed.
at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at 
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
... 8 more
Caused by: java.lang.RuntimeException: The initialization of the DataSource's 
outputs caused an error: Could not read the user code wrapper: TestMain$1
at 
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:109)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: 
org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could 
not read the user code wrapper: TestMain$1
at 
org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:290)
at 
org.apache.flink.runtime.operators.BatchTask.instantiateUserCode(BatchTask.java:1448)
at 
org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.setup(ChainedFlatMapDriver.java:39)
at 
org.apache.flink.runtime.operators.chaining.ChainedDriver.setup(ChainedDriver.java:90)
at org.apache.flink.runtime.operators.BatchTask.initOutputs(BatchTask.java:1315)
at 
org.apache.flink.runtime.operators.DataSourceTask.initOutputs(DataSourceTask.java:317)
at 
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:106)
... 3 more
Caused by: java.lang.ClassNotFoundException: TestMain$1
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at 
org.apache.flink.util.ChildFirstClassLoader.loadClass(ChildFirstClassLoader.java:69)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at 
org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1868)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
at 
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
at 
org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
... 9 more


My understanding is that when using remote environment, I don’t need to upload 
my program j

Re: The RMClient's and YarnResourceManagers internal state about the number of pending container requests has diverged

2019-10-31 Thread Yang Wang
I think till's analysis is right. I just want to share more information.

After dive into the logs of Flink resource manager and Yarn resource
manager, i found that the excess
containers come from two sides.

** Yarn Container Allocation Mechanism **
Receive containers more than allocated is inescapable. Imagine that we want
to allocate 120 containers
from Yarn. The size of container request in the *heartbeat1* will be 120.
When Yarn RM received the
request and can not allocate any container because of not enough resource.
So the allocated containers in
response of *heartbeat1 *will be 0. The Flink resource manager does not get
any containers and will
set the size of container request in *heartbeat2 *to 120. However, Yarn
resource manager has allocated
120 containers between *heartbeat1* to *heartbeat2*. When Yarn Resource
Manager receives *heartbeat2*, it will
set the 120 containers to response of *heartbeat2*. And it start to
allocate for the new request of 120. Since
Flink resource manager has received all containers, it will set the size of
container request in *heartbeat3* to 0.
Yarn Resource Manager allocate 100 containers between *heartbeat2* to
*heartbeat3*, it will set the 100 containers
to response of *heartbeat3*. So Flink Resource Manager gets the 100 excess
containers.

Note: Heartbeat means the heartbeat between Flink resource manager(Yarn
client) and Yarn resource manager.


** Flink resource manager allocates more than it really needs **
Now in the onContainersAllocated of FlinkYarnResourceManager, we iterate
through each container.
And each process will take more than 50ms. The most time cost at
uploading {uuid}-taskmanager-conf.yaml to hdfs
and starting container. So if the allocated containers are more than 10,
FlinkYarnResourceManager could not remove
container request timely and will allocate more than it really needs.


The first cause of Yarn, we could not do anything more from Flink. However,
for the second, we could reduce the time
costof each allocated container so that FlinkYarnResource will allocate as
it really need.  We could have two optimizations
here. The first is use NMClientAsync instead of NMClient to reduce the
start container time.[1] The
second is *do not *upload {uuid}-taskmanager-conf.yaml, use java options or
environments instead. [2]




1.https://issues.apache.org/jira/browse/FLINK-13184
2. https://issues.apache.org/jira/browse/FLINK-14582

Chan, Regina  于2019年10月31日周四 上午5:09写道:

> Just to provide a little bit of context, this behavior is highly
> problematic since we run these jobs at scale. This one job when running on
> 1.6 over allocated *2500* containers. On 1.9, with a one-minute heartbeat
> interval, we were able to bring that number of excess containers down to
> 230. My fear is that 230 excess containers is due to the fact that we also
> moved this to a smaller cluster so that it doesn’t have the potential of
> causing wider impact it did on the main cluster. We have over 70K jobs
> running in a day so imagine how bad this could become so I definitely
> appreciate your attention to this.
>
>
>
> I’m open to a minimum and max number of TaskExecutors, the max number is
> probably the biggest concern. Can help test this whenever it’s ready and
> again greatly appreciate it.
>
>
>
> Separately I think this loosely ties to into another thread on the dlist
> so far which is the usecase of per program or per job semantics. As we have
> one program representing a datastore’s batch with one or many jobs per
> dataset representing the stages of processing. Using this paradigm I think
> of the program having a pool of resources to be used by the job(s) with
> some expiry. The pool can be enlarged through additional requests when
> needed with the jobs acquiring and releasing back to the pool without
> having to send new requests to YARN.
>
>
>
> I believe perfect usage of this pool behavior would bring down the total
> requests from the 540 to 120 as 120+230=350 which means there’s still an
> additional request of 190 containers.
>
>
>
>
>
>
>
>
>
> *From:* Till Rohrmann 
> *Sent:* Wednesday, October 30, 2019 2:01 PM
> *To:* Yang Wang 
> *Cc:* Chan, Regina [Engineering] ; user <
> user@flink.apache.org>
> *Subject:* Re: The RMClient's and YarnResourceManagers internal state
> about the number of pending container requests has diverged
>
>
>
> Hi Regina, sorry for not getting back to you earlier. I've gone through
> the logs and I couldn't find something suspicious. What I can see though is
> the following:
>
>
>
> When you start the cluster, you submit a couple of jobs. This starts at
> 9:20. In total 120 slots are being required to run these jobs. Since you
> start a TaskExecutor with a single slot, you need 120 containers to run all
> jobs. Flink has sent all container requests by 9:21:40. So far so good.
>
>
>
> Shortly after, the cluster receives the first allocated containers.
> However, it lasts until 9:29:58 that Flink has received all 120 containers.
> I as

Async operator with a KeyedStream

2019-10-31 Thread bastien dine
Hello,

I would like to know if you can use a KeyedStream with the Async operator :
I want to use the async operator to insert some stuff in my database but I
want to limit 1 request per element (with key=id) at a time
With a regular keyBy / map, it's working, but it's too slow (i don't have
enough ressources to increase my parallelism),

As far as I have seen, this is not possible
When I write something like
Async.orderedWait(myStream.keyBy(myKeyselector)), the keyBy is totally
ignored

Have you a solution for this?

Best Regards,
Bastien

--

Bastien DINE
Data Architect / Software Engineer / Sysadmin
bastiendine.io


Re: How to stream intermediate data that is stored in external storage?

2019-10-31 Thread Averell
Hi Kant,

I wonder why you need to "source" your intermediate state from files? Why
not "source" it from the previous operator? I.e. instead of (A join B) ->
State -> files -> (C), why not do (A join B) -> State -> (files + C)?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: possible backwards compatibility issue between 1.8->1.9?

2019-10-31 Thread Piotr Nowojski
Hi,

(This question is more appropriate for the user mailing list, not dev - when 
responding to my e-mail please remove dev mailing list from the recipients, 
I’ve kept it just FYI that discussion has moved to user mailing list).

Could it be, that the problem is caused by changes in chaining strategy of the 
AsyncWaitOperator in 1.9, as explained in the release notes [1]?

> AsyncIO
> Due to a bug in the AsyncWaitOperator, in 1.9.0 the default chaining 
> behaviour of the operator is now changed so that it 
> is never chained after another operator. This should not be problematic for 
> migrating from older version snapshots as 
> long as an uid was assigned to the operator. If an uid was not assigned to 
> the operator, please see the instructions here [2]
> for a possible workaround.
>
> Related issues:
>
>   • FLINK-13063: AsyncWaitOperator shouldn’t be releasing 
> checkpointingLock [3]

Piotrek

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/release-notes/flink-1.9.html#asyncio
 

[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/upgrading.html#matching-operator-state
 

[3] https://issues.apache.org/jira/browse/FLINK-13063 


> On 30 Oct 2019, at 16:52, Bekir Oguz  wrote:
> 
> Hi guys,
> during our upgrade from 1.8.1 to 1.9.1, one of our jobs fail to start with
> the following exception. We deploy the job with 'allow-non-restored-state'
> option and from the latest checkpoint dir of the 1.8.1 version.
> 
> org.apache.flink.util.StateMigrationException: The new state typeSerializer
> for operator state must not be incompatible.
>at org.apache.flink.runtime.state.DefaultOperatorStateBackend
> .getListState(DefaultOperatorStateBackend.java:323)
>at org.apache.flink.runtime.state.DefaultOperatorStateBackend
> .getListState(DefaultOperatorStateBackend.java:214)
>at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator
> .initializeState(AsyncWaitOperator.java:272)
>at org.apache.flink.streaming.api.operators.AbstractStreamOperator
> .initializeState(AbstractStreamOperator.java:281)
>at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(
> StreamTask.java:881)
>at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
> .java:395)
>at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>at java.lang.Thread.run(Thread.java:748)
> 
> We see from the Web UI that the 'async wait operator' is causing this,
> which is not changed at all during this upgrade.
> 
> All other jobs are migrated without problems, only this one is failing. Has
> anyone else experienced this during migration?
> 
> Regards,
> Bekir Oguz



Re: How to stream intermediate data that is stored in external storage?

2019-10-31 Thread kant kodali
Hi Averell,

I want to write intermediate results (A join B) incrementally and in
real-time to some external storage so I can query it using SQL.
I am new to Flink so I am trying to find out if 1) such mechanism exists?
2) If not, what are the alternatives?

Thanks

On Thu, Oct 31, 2019 at 1:42 AM Averell  wrote:

> Hi Kant,
>
> I wonder why you need to "source" your intermediate state from files? Why
> not "source" it from the previous operator? I.e. instead of (A join B) ->
> State -> files -> (C), why not do (A join B) -> State -> (files + C)?
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Are Dynamic tables backed by rocksdb?

2019-10-31 Thread kant kodali
Hi All,

Are Dynamic tables backed by Rocksdb or in memory? if they are backed by
RocksDB can I use SQL to query the state?

Thanks!


Stateful functions presentation code (UI part)

2019-10-31 Thread Flavio Pompermaier
Hi to all,
yould it be possible to provide also the source code of the UI part of the
ride sharing example? It would be interesting to me how the UI is reading
the data from the Kafka egress.

Best,
Flavio


Re: How to stream intermediate data that is stored in external storage?

2019-10-31 Thread Averell
Hi Kant,

Not sure about what you meant in "query it using SQL"? Do you mean running
ad-hoc SQL queries on that joined data? If that's what you meant, then
you'll need some SQL server first, then write the joined data to that SQL
server. ElasticSearch and Cassandra are ready-to-use options. Writing a
custom sink function to write to your own SQL server is also a
not-so-difficult solution.

Best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Are Dynamic tables backed by rocksdb?

2019-10-31 Thread Fabian Hueske
Hi,

Dynamic tables might not be persisted at all but only when it is necessary
for the computation of a query.
For example a simple "SELECT * FROM t WHERE a = 1" query on an append only
table t does not require to persist t.

However, there are a bunch of operations that require to store some parts
of a dynamic table but not necessarily the full table.
All operators that need to store data put it into regular Flink state. This
means that all state is stored in the configured state backend.
You cannot directly query the data from there, but could use the state
processor API. However, this also assumes that you know the internal data
representation that the operators use.

Best, Fabian


Am Do., 31. Okt. 2019 um 09:51 Uhr schrieb kant kodali :

> Hi All,
>
> Are Dynamic tables backed by Rocksdb or in memory? if they are backed by
> RocksDB can I use SQL to query the state?
>
> Thanks!
>


Re: [DISCUSS] Semantic and implementation of per-job mode

2019-10-31 Thread Gyula Fóra
Hi all,

Regarding the compilation part:

I think there are up and downsides to building the Flink job (running the
main method) on the client side, however since this is the current way of
doing it we should have a very powerful reason to change the default
behaviour.
While there is a possible workaround for reading local resources on the
client (upload to hdfs) if this is a common pattern then it makes no sense
to force users to this workaround.

If we want to introduce server-side building of the jobs we should provide
both options for the users to choose from in my opinion and client side
should probably stay the default.

Cheers,
Gyula



On Thu, Oct 31, 2019 at 2:18 AM tison  wrote:

> Thanks for your attentions!
>
> @shixiaoga...@gmail.com 
>
> Yes correct. We try to avoid jobs affect one another. Also a local
> ClusterClient
> in case saves the overhead about retry before leader elected and persist
> JobGraph before submission in RestClusterClient as well as the net cost.
>
> @Paul Lam 
>
> 1. Here is already a note[1] about multiple part jobs. I am also confused
> a bit
> on this concept at first :-) Things go in similar way if you program
> contains the
> only JobGraph so that I think per-program acts like per-job-graph in this
> case
> which provides compatibility for many of one job graph program.
>
> Besides, we have to respect user program which doesn't with current
> implementation because we return abruptly when calling env#execute which
> hijack user control so that they cannot deal with the job result or the
> future of
> it. I think this is why we have to add a detach/attach option.
>
> 2. For compilation part, I think it could be a workaround that you upload
> those
> resources in a commonly known address such as HDFS so that compilation
> can read from either client or cluster.
>
> Best,
> tison.
>
> [1]
> https://issues.apache.org/jira/browse/FLINK-14051?focusedCommentId=16927430&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16927430
>
>
> Newport, Billy  于2019年10月30日周三 下午10:41写道:
>
>> We execute multiple job graphs routinely because we cannot submit a
>> single graph without it blowing up. I believe Regina spoke to this in
>> Berlin during her talk. We instead if we are processing a database
>> ingestion with 200 tables in it, we do a job graph per table rather than a
>> single job graph that does all tables instead. A single job graph can be in
>> the tens of thousands of nodes in our largest cases and we have found flink
>> (as os 1.3/1.6.4) cannot handle graphs of that size. We’re currently
>> testing 1.9.1 but have not retested the large graph scenario.
>>
>>
>>
>> Billy
>>
>>
>>
>>
>>
>> *From:* Paul Lam [mailto:paullin3...@gmail.com]
>> *Sent:* Wednesday, October 30, 2019 8:41 AM
>> *To:* SHI Xiaogang
>> *Cc:* tison; dev; user
>> *Subject:* Re: [DISCUSS] Semantic and implementation of per-job mode
>>
>>
>>
>> Hi,
>>
>>
>>
>> Thanks for starting the discussion.
>>
>>
>>
>> WRT the per-job semantic, it looks natural to me that per-job means
>> per-job-graph,
>>
>> because in my understanding JobGraph is the representation of a job.
>> Could you
>>
>> share some use case in which a user program should contain multiple job
>> graphs?
>>
>>
>>
>> WRT the per-program mode, I’m also in flavor of a unified cluster-side
>> execution
>>
>> for user program, so +1 from my side.
>>
>>
>>
>> But I think there may be some values for the current per-job mode: we now
>> have
>>
>> some common resources available on the client machine that would be read
>> by main
>>
>> methods in user programs. If migrated to per-program mode, we must
>> explicitly
>>
>> set the specific resources for each user program and ship them to the
>> cluster,
>>
>> it would be a bit inconvenient.  Also, as the job graph is compiled at
>> the client,
>>
>> we can recognize the errors caused by user code before starting the
>> cluster
>>
>> and easily get access to the logs.
>>
>>
>>
>> Best,
>>
>> Paul Lam
>>
>>
>>
>> 在 2019年10月30日,16:22,SHI Xiaogang  写道:
>>
>>
>>
>> Hi
>>
>>
>>
>> Thanks for bringing this.
>>
>>
>>
>> The design looks very nice to me in that
>>
>> 1. In the new per-job mode, we don't need to compile user programs in the
>> client and can directly run user programs with user jars. That way, it's
>> easier for resource isolation in multi-tenant platforms and is much safer.
>>
>> 2. The execution of user programs can be unified in session and per-job
>> modes. In session mode, user jobs are submitted via a remote ClusterClient
>> while in per-job mode user jobs are submitted via a local ClusterClient.
>>
>>
>>
>> Regards,
>>
>> Xiaogang
>>
>>
>>
>> tison  于2019年10月30日周三 下午3:30写道:
>>
>> (CC user list because I think users may have ideas on how per-job mode
>> should look like)
>>
>>
>>
>> Hi all,
>>
>> In the discussion about Flink on k8s[1] we encounter a problem that
>> opinions
>> diverge in how so-called per-job mode works. This thread is aimed at
>> stating
>> a dedi

Re: [DISCUSS] Semantic and implementation of per-job mode

2019-10-31 Thread Yang Wang
Thanks for tison starting this exciting discussion. We also suffer a lot
from the per job mode.
I think the per-job cluster is a dedicated cluster for only one job and
will not accept more other
jobs. It has the advantage of one-step submission, do not need to start
dispatcher first and
then submit the job. And it does not matter where the job graph is
generated and job is submitted.
Now we have two cases.

(1) Current Yarn detached cluster. The job graph is generated in client and
then use distributed
cache to flink master container. And the MiniDispatcher uses
`FileJobGraphRetrieve` to get it.
The job will be submitted at flink master side.


(2) Standalone per job cluster. User jars are already built into image. So
the job graph will be
generated at flink master side and `ClasspathJobGraphRetriver` is used to
get it. The job will
also be submitted at flink master side.


For the (1) and (2), only one job in user program could be supported. The
per job means
per job-graph, so it works just as expected.


Tison suggests to add a new mode "per-program”. The user jar will be
transferred to flink master
container, and a local client will be started to generate job graph and
submit job. I think it could
cover all the functionality of current per job, both (1) and (2). Also the
detach mode and attach
mode could be unified. We do not need to start a session cluster to
simulate per job for multiple parts.


I am in favor of the “per-program” mode. Just two concerns.
1. How many users are using multiple jobs in one program?
2. Why do not always use session cluster to simulate per job? Maybe
one-step submission
is a convincing reason.



Best,
Yang

tison  于2019年10月31日周四 上午9:18写道:

> Thanks for your attentions!
>
> @shixiaoga...@gmail.com 
>
> Yes correct. We try to avoid jobs affect one another. Also a local
> ClusterClient
> in case saves the overhead about retry before leader elected and persist
> JobGraph before submission in RestClusterClient as well as the net cost.
>
> @Paul Lam 
>
> 1. Here is already a note[1] about multiple part jobs. I am also confused
> a bit
> on this concept at first :-) Things go in similar way if you program
> contains the
> only JobGraph so that I think per-program acts like per-job-graph in this
> case
> which provides compatibility for many of one job graph program.
>
> Besides, we have to respect user program which doesn't with current
> implementation because we return abruptly when calling env#execute which
> hijack user control so that they cannot deal with the job result or the
> future of
> it. I think this is why we have to add a detach/attach option.
>
> 2. For compilation part, I think it could be a workaround that you upload
> those
> resources in a commonly known address such as HDFS so that compilation
> can read from either client or cluster.
>
> Best,
> tison.
>
> [1]
> https://issues.apache.org/jira/browse/FLINK-14051?focusedCommentId=16927430&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16927430
>
>
> Newport, Billy  于2019年10月30日周三 下午10:41写道:
>
>> We execute multiple job graphs routinely because we cannot submit a
>> single graph without it blowing up. I believe Regina spoke to this in
>> Berlin during her talk. We instead if we are processing a database
>> ingestion with 200 tables in it, we do a job graph per table rather than a
>> single job graph that does all tables instead. A single job graph can be in
>> the tens of thousands of nodes in our largest cases and we have found flink
>> (as os 1.3/1.6.4) cannot handle graphs of that size. We’re currently
>> testing 1.9.1 but have not retested the large graph scenario.
>>
>>
>>
>> Billy
>>
>>
>>
>>
>>
>> *From:* Paul Lam [mailto:paullin3...@gmail.com]
>> *Sent:* Wednesday, October 30, 2019 8:41 AM
>> *To:* SHI Xiaogang
>> *Cc:* tison; dev; user
>> *Subject:* Re: [DISCUSS] Semantic and implementation of per-job mode
>>
>>
>>
>> Hi,
>>
>>
>>
>> Thanks for starting the discussion.
>>
>>
>>
>> WRT the per-job semantic, it looks natural to me that per-job means
>> per-job-graph,
>>
>> because in my understanding JobGraph is the representation of a job.
>> Could you
>>
>> share some use case in which a user program should contain multiple job
>> graphs?
>>
>>
>>
>> WRT the per-program mode, I’m also in flavor of a unified cluster-side
>> execution
>>
>> for user program, so +1 from my side.
>>
>>
>>
>> But I think there may be some values for the current per-job mode: we now
>> have
>>
>> some common resources available on the client machine that would be read
>> by main
>>
>> methods in user programs. If migrated to per-program mode, we must
>> explicitly
>>
>> set the specific resources for each user program and ship them to the
>> cluster,
>>
>> it would be a bit inconvenient.  Also, as the job graph is compiled at
>> the client,
>>
>> we can recognize the errors caused by user code before starting the
>> cluster
>>
>> and easily get access to the logs.
>>
>>
>>
>> Best,
>>
>> Pau

Re: RemoteEnvironment cannot execute job from local.

2019-10-31 Thread Till Rohrmann
In order to run the program on a remote cluster from the IDE you need to
first build the jar containing your user code. This jar needs to passed
to createRemoteEnvironment() so that the Flink client knows which jar to
upload. Hence, please make sure that /tmp/myudf.jar contains your user code.

Cheers,
Till

On Thu, Oct 31, 2019 at 9:01 AM Simon Su  wrote:

>
> Hi all
>I want to test to submit a job from my local IDE and I deployed a Flink
> cluster in my vm.
>Here is my code from Flink 1.9 document and add some of my parameters.
>
> public static void main(String[] args) throws Exception {
> ExecutionEnvironment env = ExecutionEnvironment
> .createRemoteEnvironment("localhost", 8081, "/tmp/myudf.jar");
>
> DataSet data = env.readTextFile("/tmp/file");
>
> data
> .filter(new FilterFunction() {
> public boolean filter(String value) {
> return value.startsWith("http://";);
> }
> })
> .writeAsText("/tmp/file1");
>
> env.execute();
> }
>
> When I run the program, I raises the error like:
>
> Exception in thread "main"
> org.apache.flink.client.program.ProgramInvocationException: Job failed.
> (JobID: 1f32190552e955bb2048c31930edfb0e)
> at
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:262)
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:326)
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:301)
> at
> org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:209)
> at
> org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:186)
> at
> org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:173)
> at
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:820)
> at TestMain.main(TestMain.java:25)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
> execution failed.
> at
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
> at
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
> ... 8 more
> *Caused by: java.lang.RuntimeException: The initialization of the
> DataSource's outputs caused an error: Could not read the user code wrapper:
> TestMain$1*
> at
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:109)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> at java.lang.Thread.run(Thread.java:748)
> *Caused by:
> org.apache.flink.runtime.operators.util.CorruptConfigurationException:
> Could not read the user code wrapper: TestMain$1*
> at
> org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:290)
> at
> org.apache.flink.runtime.operators.BatchTask.instantiateUserCode(BatchTask.java:1448)
> at
> org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.setup(ChainedFlatMapDriver.java:39)
> at
> org.apache.flink.runtime.operators.chaining.ChainedDriver.setup(ChainedDriver.java:90)
> at
> org.apache.flink.runtime.operators.BatchTask.initOutputs(BatchTask.java:1315)
> at
> org.apache.flink.runtime.operators.DataSourceTask.initOutputs(DataSourceTask.java:317)
> at
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:106)
> ... 3 more
> *Caused by: java.lang.ClassNotFoundException: TestMain$1*
> at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at
> org.apache.flink.util.ChildFirstClassLoader.loadClass(ChildFirstClassLoader.java:69)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
> at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1868)
> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> at java.io.ObjectInputStream.readObject0(ObjectInp

Re: How to stream intermediate data that is stored in external storage?

2019-10-31 Thread Piotr Nowojski
Hi,

Generally speaking it’s a good question, why do you need to do this? What 
information do you need from the outer join’s internal state? Can not you just 
process the result to obtain the same information in another way?


> Yes, I am looking for this but I am not sure how to do this? Should I use the 
> processFunction(like the event-driven applications) ?

Another basic question, are you using DataStream API, TableAPI or SQL?

Assuming TableAPI or SQL, you would have to split your query into three:

1. Left side of the join
2. Right side of the join
3. Downstream of the join (if any)

Next you would have to write your own DataStream API outer join operator 
(implement your own or copy/paste or inherit from the SQL’s/Table API 
operator), which has an additional side output [0] of the state changes that 
you want. To do this, you probably can go with two different approaches:

a) define CoProcessFunction 
b) define TwoInputStreamOperator

After that, you have to convert the queries from 1. And 2. Into two separate 
DataStream’s [1], connect them [2] and process [3] with yours CoProcessFunction 
(a) or transform [4] with yours TwoInputStreamOperator, and convert the result 
back from a DataStream to a Table [5]

[0] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html
 

[1] for example StreamTableEnvironment#toRetractStream() or #toAppendStream()
[2] https://training.ververica.com/lessons/connected-streams.html 

[3] ConnectedStreams#process()
[4] ConnectedStreams#transform()
[5] 
StreamTableEnvironment#fromDataStream(org.apache.flink.streaming.api.datastream.DataStream)

However keep in mind that in [5], there is probably no way to convert a 
DataStream with retraction/updates back into a Table, so your join operator 
would have to produce append only output.

Piotrek

> On 30 Oct 2019, at 20:45, kant kodali  wrote:
> 
> "I think you would have to implement your own custom operator that would 
> output changes to it’s internal state as a side output"
> 
> Yes, I am looking for this but I am not sure how to do this? Should I use the 
> processFunction(like the event-driven applications) ?
> 
> On Wed, Oct 30, 2019 at 8:53 AM Piotr Nowojski  > wrote:
> Hi Kant,
> 
> Checkpointing interval is configurable, but I wouldn’t count on it working 
> well with even 10s intervals. 
>  
> I think what you are this is not supported by Flink generically. Maybe 
> Queryable state I mentioned before? But I have never used it.
> 
> I think you would have to implement your own custom operator that would 
> output changes to it’s internal state as a side output.
> 
> Piotrek
> 
>> On 30 Oct 2019, at 16:14, kant kodali > > wrote:
>> 
>> Hi Piotr,
>> 
>> I am talking about the internal state. How often this state gets 
>> checkpointed? if it is every few seconds then it may not meet our real-time 
>> requirement(sub second).
>> 
>> The question really is can I read this internal state in a streaming fashion 
>> in an update mode? The state processor API seems to expose DataSet but not 
>> DataStream so I am not sure how to read internal state in streaming fashion 
>> in an update made?
>> 
>> Thanks!
>> 
>> On Wed, Oct 30, 2019 at 7:25 AM Piotr Nowojski > > wrote:
>> Hi,
>> 
>> I’m not sure what are you trying to achieve. What do you mean by “state of 
>> full outer join”? The result of it? Or it’s internal state? Also keep in 
>> mind, that internal state of the operators in Flink is already 
>> snapshoted/written down to an external storage during checkpointing 
>> mechanism.
>> 
>> The result should be simple, just write it to some Sink.
>> 
>> For the internal state, it sounds like you are doing something not the way 
>> it was intended… having said that, you can try one of the following options:
>> a) Implement your own outer join operator (might not be as easy if you are 
>> using Table API/SQL) and just create a side output for the state changes.
>> b) Use state processor API to read the content of a savepoint/checkpoint 
>> [1][2]
>> c) Use queryable state [3] (I’m not sure about this, I have never used 
>> queryable state)
>> 
>> Piotrek
>> 
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
>>  
>> 
>> [2] https://flink.apache.org/feature/2019/09/13/state-processor-api.html 
>> 
>> [3] 
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html
>>  
>> 
>> 
>>> On 29 Oct 2019, at 16:42, kant kodali >> 

Re: How to stream intermediate data that is stored in external storage?

2019-10-31 Thread kant kodali
Hi Averell,

yes,  I want to run ad-hoc SQL queries on the joined data as well as data
that may join in the future. For example, let's say if you take datasets A
and B in streaming mode a row in A can join with a row B in some time in
future let's say but meanwhile if I query the intermediate state using SQL
I want the row in A that have not yet joined with B to also be available to
Query. so not just joined results alone but also data that might be join in
the future as well since its all streaming.

Thanks!


Re: How to stream intermediate data that is stored in external storage?

2019-10-31 Thread Huyen Levan
Hi Kant,

So your problem statement is "ingest 2 streams into a data warehouse". The
main component of the solution, from my view, is that SQL server. You can
have a sink function to insert records in your two streams into two
different tables (A and B), or upsert into one single table C. That upsert
action itself serves as a join function, there's no need to join in Flink
at all.

There are many tools out there can be used for that ingestion. Flink, of
course, can be used for that purpose. But for me, it's an overkill.

Regards,
Averell

On Thu, 31 Oct. 2019, 8:19 pm kant kodali,  wrote:

> Hi Averell,
>
> yes,  I want to run ad-hoc SQL queries on the joined data as well as data
> that may join in the future. For example, let's say if you take datasets A
> and B in streaming mode a row in A can join with a row B in some time in
> future let's say but meanwhile if I query the intermediate state using SQL
> I want the row in A that have not yet joined with B to also be available to
> Query. so not just joined results alone but also data that might be join in
> the future as well since its all streaming.
>
> Thanks!
>


Re: [DISCUSS] Semantic and implementation of per-job mode

2019-10-31 Thread Flavio Pompermaier
Hi all,
we're using a lot the multiple jobs in one program and this is why: when
you fetch data from a huge number of sources and, for each source, you do
some transformation and then you want to write into a single directory the
union of all outputs (this assumes you're doing batch). When the number of
sources is large, if you want to do this in a single job, the graph becomes
very big and this is a problem for several reasons:

   - too many substasks /threadsi per slot
   - increase of back pressure
   - if a single "sub-job" fails all the job fails..this is very annoying
   if this happens after a half a day for example
   - In our use case, the big-graph mode takes much longer than running
   each job separately (but maybe this is true only if you don't have much
   hardware resources)
   - debugging the cause of a fail could become a daunting task if the job
   graph is too large
   - we faced may strange errors when trying to run the single big-job mode
   (due to serialization corruption)

So, summarizing our overall experience with Flink batch is: the easier is
the job graph the better!

Best,
Flavio


On Thu, Oct 31, 2019 at 10:14 AM Yang Wang  wrote:

> Thanks for tison starting this exciting discussion. We also suffer a lot
> from the per job mode.
> I think the per-job cluster is a dedicated cluster for only one job and
> will not accept more other
> jobs. It has the advantage of one-step submission, do not need to start
> dispatcher first and
> then submit the job. And it does not matter where the job graph is
> generated and job is submitted.
> Now we have two cases.
>
> (1) Current Yarn detached cluster. The job graph is generated in client
> and then use distributed
> cache to flink master container. And the MiniDispatcher uses
> `FileJobGraphRetrieve` to get it.
> The job will be submitted at flink master side.
>
>
> (2) Standalone per job cluster. User jars are already built into image. So
> the job graph will be
> generated at flink master side and `ClasspathJobGraphRetriver` is used to
> get it. The job will
> also be submitted at flink master side.
>
>
> For the (1) and (2), only one job in user program could be supported. The
> per job means
> per job-graph, so it works just as expected.
>
>
> Tison suggests to add a new mode "per-program”. The user jar will be
> transferred to flink master
> container, and a local client will be started to generate job graph and
> submit job. I think it could
> cover all the functionality of current per job, both (1) and (2). Also the
> detach mode and attach
> mode could be unified. We do not need to start a session cluster to
> simulate per job for multiple parts.
>
>
> I am in favor of the “per-program” mode. Just two concerns.
> 1. How many users are using multiple jobs in one program?
> 2. Why do not always use session cluster to simulate per job? Maybe
> one-step submission
> is a convincing reason.
>
>
>
> Best,
> Yang
>
> tison  于2019年10月31日周四 上午9:18写道:
>
>> Thanks for your attentions!
>>
>> @shixiaoga...@gmail.com 
>>
>> Yes correct. We try to avoid jobs affect one another. Also a local
>> ClusterClient
>> in case saves the overhead about retry before leader elected and persist
>> JobGraph before submission in RestClusterClient as well as the net cost.
>>
>> @Paul Lam 
>>
>> 1. Here is already a note[1] about multiple part jobs. I am also confused
>> a bit
>> on this concept at first :-) Things go in similar way if you program
>> contains the
>> only JobGraph so that I think per-program acts like per-job-graph in this
>> case
>> which provides compatibility for many of one job graph program.
>>
>> Besides, we have to respect user program which doesn't with current
>> implementation because we return abruptly when calling env#execute which
>> hijack user control so that they cannot deal with the job result or the
>> future of
>> it. I think this is why we have to add a detach/attach option.
>>
>> 2. For compilation part, I think it could be a workaround that you upload
>> those
>> resources in a commonly known address such as HDFS so that compilation
>> can read from either client or cluster.
>>
>> Best,
>> tison.
>>
>> [1]
>> https://issues.apache.org/jira/browse/FLINK-14051?focusedCommentId=16927430&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16927430
>>
>>
>> Newport, Billy  于2019年10月30日周三 下午10:41写道:
>>
>>> We execute multiple job graphs routinely because we cannot submit a
>>> single graph without it blowing up. I believe Regina spoke to this in
>>> Berlin during her talk. We instead if we are processing a database
>>> ingestion with 200 tables in it, we do a job graph per table rather than a
>>> single job graph that does all tables instead. A single job graph can be in
>>> the tens of thousands of nodes in our largest cases and we have found flink
>>> (as os 1.3/1.6.4) cannot handle graphs of that size. We’re currently
>>> testing 1.9.1 but have not retested the large graph scenario.
>>>
>>>
>>>
>>> 

Re: RemoteEnvironment cannot execute job from local.

2019-10-31 Thread Simon Su
Hi Till 
Thanks for your reply. Actually I modify the code like this:
I commented the filter part, and re-run the code, then it works well !!  The 
jar passed to createRemoteEnvironment is a udf jar, which does not contain my 
code 
My flink version is 1.9.0, So I’m confused about the actual behaviors of 
‘createRemoteEnvironment’. is it a potential bugs? 


ExecutionEnvironment env = ExecutionEnvironment
.createRemoteEnvironment("localhost", 8081, “/tmp/udfs.jar");

DataSet data = env.readTextFile("/tmp/file");

data
//.filter(new FilterFunction() {
//public boolean filter(String value) {
//return value.startsWith("http://";);
//}
//})
.writeAsText("/tmp/file313");

env.execute();


Thanks,
SImon


On 10/31/2019 17:23,Till Rohrmann wrote:
In order to run the program on a remote cluster from the IDE you need to first 
build the jar containing your user code. This jar needs to passed to 
createRemoteEnvironment() so that the Flink client knows which jar to upload. 
Hence, please make sure that /tmp/myudf.jar contains your user code.


Cheers,
Till


On Thu, Oct 31, 2019 at 9:01 AM Simon Su  wrote:



Hi all 
   I want to test to submit a job from my local IDE and I deployed a Flink 
cluster in my vm. 
   Here is my code from Flink 1.9 document and add some of my parameters.
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment
.createRemoteEnvironment("localhost", 8081, "/tmp/myudf.jar");

DataSet data = env.readTextFile("/tmp/file");

data
.filter(new FilterFunction() {
public boolean filter(String value) {
return value.startsWith("http://";);
}
})
.writeAsText("/tmp/file1");

env.execute();
}

When I run the program, I raises the error like: 


Exception in thread "main" 
org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: 
1f32190552e955bb2048c31930edfb0e)
at 
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:262)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:326)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:301)
at 
org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:209)
at org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:186)
at 
org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:173)
at 
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:820)
at TestMain.main(TestMain.java:25)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution 
failed.
at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at 
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
... 8 more
Caused by: java.lang.RuntimeException: The initialization of the DataSource's 
outputs caused an error: Could not read the user code wrapper: TestMain$1
at 
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:109)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: 
org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could 
not read the user code wrapper: TestMain$1
at 
org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:290)
at 
org.apache.flink.runtime.operators.BatchTask.instantiateUserCode(BatchTask.java:1448)
at 
org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.setup(ChainedFlatMapDriver.java:39)
at 
org.apache.flink.runtime.operators.chaining.ChainedDriver.setup(ChainedDriver.java:90)
at org.apache.flink.runtime.operators.BatchTask.initOutputs(BatchTask.java:1315)
at 
org.apache.flink.runtime.operators.DataSourceTask.initOutputs(DataSourceTask.java:317)
at 
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:106)
... 3 more
Caused by: java.lang.ClassNotFoundException: TestMain$1
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at 
org.apache.flink.util.ChildFirstClassLoader.loadClass(ChildFirstClassLoader.java:69)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at 
org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1868)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream

Re: Stateful functions presentation code (UI part)

2019-10-31 Thread vino yang
Hi Flavio,

Please see this link.[1]

Best,
Vino

[1]:
https://github.com/ververica/stateful-functions/tree/master/stateful-functions-examples/stateful-functions-ridesharing-example

Flavio Pompermaier  于2019年10月31日周四 下午4:53写道:

> Hi to all,
> yould it be possible to provide also the source code of the UI part of the
> ride sharing example? It would be interesting to me how the UI is reading
> the data from the Kafka egress.
>
> Best,
> Flavio
>


Re: Stateful functions presentation code (UI part)

2019-10-31 Thread Flavio Pompermaier
Hi Vino,
I already checked that code but I can't find the UI part :(

On Thu, Oct 31, 2019 at 12:32 PM vino yang  wrote:

> Hi Flavio,
>
> Please see this link.[1]
>
> Best,
> Vino
>
> [1]:
> https://github.com/ververica/stateful-functions/tree/master/stateful-functions-examples/stateful-functions-ridesharing-example
>
> Flavio Pompermaier  于2019年10月31日周四 下午4:53写道:
>
>> Hi to all,
>> yould it be possible to provide also the source code of the UI part of
>> the ride sharing example? It would be interesting to me how the UI is
>> reading the data from the Kafka egress.
>>
>> Best,
>> Flavio
>>
>


Re: low performance in running queries

2019-10-31 Thread Habib Mostafaei
I enclosed all logs from the run and for this run I used parallelism 
one. However, for other runs I checked and found that all parallel 
workers were working properly. Is there a simple way to get profiling 
information in Flink?


Best,

Habib

On 10/31/2019 2:54 AM, Zhenghua Gao wrote:

I think more runtime information would help figure outwheretheproblem is.
1) how many parallelisms actually working
2) the metrics for each operator
3) the jvm profiling information, etc

*Best Regards,*
*Zhenghua Gao*


On Wed, Oct 30, 2019 at 8:25 PM Habib Mostafaei 
mailto:ha...@inet.tu-berlin.de>> wrote:


Thanks Gao for the reply. I used the parallelism parameter with
different values like 6 and 8 but still the execution time is not
comparable with a single threaded python script. What would be the
reasonable value for the parallelism?

Best,

Habib

On 10/30/2019 1:17 PM, Zhenghua Gao wrote:

The reason might be the parallelism of your task is only 1,
that's too low.
See [1] to specify proper parallelism  for your job, and the
execution time should be reduced significantly.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/parallel.html

*Best Regards,*
*Zhenghua Gao*


On Tue, Oct 29, 2019 at 9:27 PM Habib Mostafaei
mailto:ha...@inet.tu-berlin.de>> wrote:

Hi all,

I am running Flink on a standalone cluster and getting very long
execution time for the streaming queries like WordCount for a
fixed text
file. My VM runs on a Debian 10 with 16 cpu cores and 32GB of
RAM. I
have a text file with size of 2GB. When I run the Flink on a
standalone
cluster, i.e., one JobManager and one taskManager with 25GB
of heapsize,
it took around two hours to finish counting this file while a
simple
python script can do it in around 7 minutes. Just wondering
what is
wrong with my setup. I ran the experiments on a cluster with six
taskManagers, but I still get very long execution time like
25 minutes
or so. I tried to increase the JVM heap size to have lower
execution
time but it did not help. I attached the log file and the Flink
configuration file to this email.

Best,

Habib



2019-10-30 15:59:47,117 INFO  org.apache.flink.client.cli.CliFrontend   
- 

2019-10-30 15:59:47,121 INFO  org.apache.flink.client.cli.CliFrontend   
-  Starting Command Line Client (Version: 1.8.2, Rev:6322618, 
Date:04.09.2019 @ 22:07:41 CST)
2019-10-30 15:59:47,121 INFO  org.apache.flink.client.cli.CliFrontend   
-  OS current user: xxx
2019-10-30 15:59:47,122 INFO  org.apache.flink.client.cli.CliFrontend   
-  Current Hadoop/Kerberos user: 
2019-10-30 15:59:47,122 INFO  org.apache.flink.client.cli.CliFrontend   
-  JVM: OpenJDK 64-Bit Server VM - AdoptOpenJDK - 1.8/25.232-b09
2019-10-30 15:59:47,123 INFO  org.apache.flink.client.cli.CliFrontend   
-  Maximum heap size: 7143 MiBytes
2019-10-30 15:59:47,123 INFO  org.apache.flink.client.cli.CliFrontend   
-  JAVA_HOME: /usr/lib/jvm/adoptopenjdk-8-hotspot-amd64/
2019-10-30 15:59:47,124 INFO  org.apache.flink.client.cli.CliFrontend   
-  No Hadoop Dependency available
2019-10-30 15:59:47,124 INFO  org.apache.flink.client.cli.CliFrontend   
-  JVM Options:
2019-10-30 15:59:47,124 INFO  org.apache.flink.client.cli.CliFrontend   
- -Dlog.file=/home/xxx/flink-1.8.2/log/flink-xxx-client-xxx.log
2019-10-30 15:59:47,125 INFO  org.apache.flink.client.cli.CliFrontend   
- 
-Dlog4j.configuration=file:/home/xxx/flink-1.8.2/conf/log4j-cli.properties
2019-10-30 15:59:47,125 INFO  org.apache.flink.client.cli.CliFrontend   
- 
-Dlogback.configurationFile=file:/home/xxx/flink-1.8.2/conf/logback.xml
2019-10-30 15:59:47,125 INFO  org.apache.flink.client.cli.CliFrontend   
-  Program Arguments:
2019-10-30 15:59:47,126 INFO  org.apache.flink.client.cli.CliFrontend   
- run
2019-10-30 15:59:47,126 INFO  org.apache.flink.client.cli.CliFrontend   
- examples/streaming/WordCount.jar
2019-10-30 15:59:47,126 INFO  org.apache.flink.client.cli.CliFrontend   
- --input
2019-10-30 15:59:47,127 INFO  org.apache.flink.client.cli.CliFrontend   
- ../test.txt
2019-10-30 15:59:47,127 INFO  org.apache.flink.client.cli.CliFrontend   
-  Classpath: 
/home/xxx/flink-1.8.2/lib/log4j-1.2.17.jar:/home/xxx/flink-1.8.2/lib/slf4j-log4j12-1.7.15.jar:/home/xxx/flink-1.8.2/lib/flink-dist_2.12-1.8.2.jar:::
2019-10-30 15:59:47,127 INFO  or

State restoration from checkpoint

2019-10-31 Thread Parth Sarathy
_metadata._metadata

 
Hi,  Recently we upgraded our application in which flink windowed
transformation is used. The earlier version of the application used flink
1.7.2 while the new version uses flink 1.8.2. While submitting new job the
application sets path to the latest checkpoint directory as restore path.
After upgrade we see below statement in job manager log which was as
expected:
2019-10-30 13:04:46.895 [flink-akka.actor.default-dispatcher-12] INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Starting job
b8370f434690bd82ae413183b0826567 from savepoint
/data-processor/data/checkpoints/c7775bdc5d9f51f41576342946cf1037/chk-19089
(allowing non restored state)
But then the following error comes which prevents the new job to run:  
2019-10-30 13:05:23.500 [flink-akka.actor.default-dispatcher-12] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph  -
graph-updates-stream (1/8) (2d07dc9bf1ef6ef3e4b7fdbf32943e4e) switched from
RUNNING to FAILED.java.lang.Exception: Exception while creating
StreamOperatorStateContext. at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
~[flink-dist_2.11-1.8.2.jar:1.8.2]  at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)
~[flink-dist_2.11-1.8.2.jar:1.8.2]  at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:740)
~[flink-dist_2.11-1.8.2.jar:1.8.2]  at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:291)
~[flink-dist_2.11-1.8.2.jar:1.8.2]  at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
~[flink-dist_2.11-1.8.2.jar:1.8.2]  at java.lang.Thread.run(Thread.java:748)
~[na:1.8.0_212]Caused by: org.apache.flink.util.FlinkException: Could not
restore keyed state backend for
WindowOperator_6d6ae3c2fa6c08ed7dd466debdde6743_(1/8) from any of the 1
provided restore options.   at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
~[flink-dist_2.11-1.8.2.jar:1.8.2]  at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:307)
~[flink-dist_2.11-1.8.2.jar:1.8.2]  at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
~[flink-dist_2.11-1.8.2.jar:1.8.2]  ... 5 common frames omittedCaused by:
org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected
exception.  at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:324)
~[flink-dist_2.11-1.8.2.jar:1.8.2]  at
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:520)
~[flink-dist_2.11-1.8.2.jar:1.8.2]  at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:291)
~[flink-dist_2.11-1.8.2.jar:1.8.2]  at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
~[flink-dist_2.11-1.8.2.jar:1.8.2]  at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
~[flink-dist_2.11-1.8.2.jar:1.8.2]  ... 7 common frames omitted
*Caused by: java.io.FileNotFoundException:
/data-processor/data/checkpoints/6f8c503c32329e7ed1df3029ba1e2c74/shared/75d656bd-450e-42eb-ab40-edde993e3f12
(No such file or directory)*at java.io.FileInputStream.open0(Native Method)
~[na:1.8.0_212]...
Why is it looking for files in some other job directory for restoring
state?I have attached metadata file from
/data-processor/data/checkpoints/c7775bdc5d9f51f41576342946cf1037/chk-19089.
It refers to multiple directories in
/data-processor/data/checkpoints/6f8c503c32329e7ed1df3029ba1e2c74/shared/.
 Please elaborate on which all checkpoint directories/files are used for
state restoration. Our requirement is to clean up all the job/checkpoint
directories which are not required. 
Thanks,
Parth Sarathy



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: [DISCUSS] Semantic and implementation of per-job mode

2019-10-31 Thread Theo Diefenthal
I agree with Gyula Fora, 

In our case, we have a client-machine in the middle between our YARN cluster 
and some backend services, which can not be reached directly from the cluster 
nodes. On application startup, we connect to some external systems, get some 
information crucial for the job runtime and finally build up the job graph to 
be committed. 

It is true that we could workaround this, but it would be pretty annoying to 
connect to the remote services, collect the data, upload it to HDFS, start the 
job and make sure, housekeeping of those files is also done at some later time. 

The current behavior also corresponds to the behavior of Sparks driver mode, 
which made the transition from Spark to Flink easier for us. 

But I see the point, especially in terms of Kubernetes and would thus also vote 
for an opt-in solution, being the client compilation the default and having an 
option for the per-program mode as well. 

Best regards 


Von: "Flavio Pompermaier"  
An: "Yang Wang"  
CC: "tison" , "Newport, Billy" , 
"Paul Lam" , "SHI Xiaogang" , 
"dev" , "user"  
Gesendet: Donnerstag, 31. Oktober 2019 10:45:36 
Betreff: Re: [DISCUSS] Semantic and implementation of per-job mode 

Hi all, 
we're using a lot the multiple jobs in one program and this is why: when you 
fetch data from a huge number of sources and, for each source, you do some 
transformation and then you want to write into a single directory the union of 
all outputs (this assumes you're doing batch). When the number of sources is 
large, if you want to do this in a single job, the graph becomes very big and 
this is a problem for several reasons: 


* too many substasks /threadsi per slot 
* increase of back pressure 
* if a single "sub-job" fails all the job fails..this is very annoying if 
this happens after a half a day for example 
* In our use case, the big-graph mode takes much longer than running each 
job separately (but maybe this is true only if you don't have much hardware 
resources) 
* debugging the cause of a fail could become a daunting task if the job 
graph is too large 
* we faced may strange errors when trying to run the single big-job mode 
(due to serialization corruption) 

So, summarizing our overall experience with Flink batch is: the easier is the 
job graph the better! 

Best, 
Flavio 


On Thu, Oct 31, 2019 at 10:14 AM Yang Wang < [ mailto:danrtsey...@gmail.com | 
danrtsey...@gmail.com ] > wrote: 



Thanks for tison starting this exciting discussion. We also suffer a lot from 
the per job mode. 
I think the per-job cluster is a dedicated cluster for only one job and will 
not accept more other 
jobs. It has the advantage of one-step submission, do not need to start 
dispatcher first and 
then submit the job. And it does not matter where the job graph is generated 
and job is submitted. 
Now we have two cases. 

(1) Current Yarn detached cluster. The job graph is generated in client and 
then use distributed 
cache to flink master container. And the MiniDispatcher uses 
`FileJobGraphRetrieve` to get it. 
The job will be submitted at flink master side. 


(2) Standalone per job cluster. User jars are already built into image. So the 
job graph will be 
generated at flink master side and `ClasspathJobGraphRetriver` is used to get 
it. The job will 
also be submitted at flink master side. 


For the (1) and (2), only one job in user program could be supported. The per 
job means 
per job-graph, so it works just as expected. 


Tison suggests to add a new mode "per-program”. The user jar will be 
transferred to flink master 
container, and a local client will be started to generate job graph and submit 
job. I think it could 
cover all the functionality of current per job, both (1) and (2). Also the 
detach mode and attach 
mode could be unified. We do not need to start a session cluster to simulate 
per job for multiple parts. 


I am in favor of the “per-program” mode. Just two concerns. 
1. How many users are using multiple jobs in one program? 
2. Why do not always use session cluster to simulate per job? Maybe one-step 
submission 
is a convincing reason. 


Best, 
Yang 

tison < [ mailto:wander4...@gmail.com | wander4...@gmail.com ] > 于2019年10月31日周四 
上午9:18写道: 

BQ_BEGIN

Thanks for your attentions! 

[ mailto:shixiaoga...@gmail.com | @shixiaoga...@gmail.com ] 

Yes correct. We try to avoid jobs affect one another. Also a local 
ClusterClient 
in case saves the overhead about retry before leader elected and persist 
JobGraph before submission in RestClusterClient as well as the net cost. 

[ mailto:paullin3...@gmail.com | @Paul Lam ] 

1. Here is already a note[1] about multiple part jobs. I am also confused a bit 
on this concept at first :-) Things go in similar way if you program contains 
the 
only JobGraph so that I think per-program acts like per-job-graph in this case 
which provides compatibility for many of one job graph program. 

Besides, we have to respect user program whic

Re: Stateful functions presentation code (UI part)

2019-10-31 Thread Igal Shilman
Hi Flavio,

We haven't included the UI source code just yet, we've only used it for
demos and talks.

The reason is that (1) we didn't put a lot of effort and time there (2)
didn't check the time to go through the individual dependencies and
licences.
But we will add that very soon.

Would having the UI code there would improve your understanding? or is
there another reason?

Thanks,
Igal

On Thu, Oct 31, 2019 at 5:44 AM Flavio Pompermaier 
wrote:

> Hi Vino,
> I already checked that code but I can't find the UI part :(
>
> On Thu, Oct 31, 2019 at 12:32 PM vino yang  wrote:
>
>> Hi Flavio,
>>
>> Please see this link.[1]
>>
>> Best,
>> Vino
>>
>> [1]:
>> https://github.com/ververica/stateful-functions/tree/master/stateful-functions-examples/stateful-functions-ridesharing-example
>>
>> Flavio Pompermaier  于2019年10月31日周四 下午4:53写道:
>>
>>> Hi to all,
>>> yould it be possible to provide also the source code of the UI part of
>>> the ride sharing example? It would be interesting to me how the UI is
>>> reading the data from the Kafka egress.
>>>
>>> Best,
>>> Flavio
>>>
>>
>


Re: Stateful functions presentation code (UI part)

2019-10-31 Thread Flavio Pompermaier
Yes, I'm interested in how to read data from a UI..which egress should I
use? If we use a kafka queue, how to filter data received in the topic?
Should I use a correlation id or use a new topic per user?

Il Gio 31 Ott 2019, 16:08 Igal Shilman  ha scritto:

> Hi Flavio,
>
> We haven't included the UI source code just yet, we've only used it for
> demos and talks.
>
> The reason is that (1) we didn't put a lot of effort and time there (2)
> didn't check the time to go through the individual dependencies and
> licences.
> But we will add that very soon.
>
> Would having the UI code there would improve your understanding? or is
> there another reason?
>
> Thanks,
> Igal
>
> On Thu, Oct 31, 2019 at 5:44 AM Flavio Pompermaier 
> wrote:
>
>> Hi Vino,
>> I already checked that code but I can't find the UI part :(
>>
>> On Thu, Oct 31, 2019 at 12:32 PM vino yang  wrote:
>>
>>> Hi Flavio,
>>>
>>> Please see this link.[1]
>>>
>>> Best,
>>> Vino
>>>
>>> [1]:
>>> https://github.com/ververica/stateful-functions/tree/master/stateful-functions-examples/stateful-functions-ridesharing-example
>>>
>>> Flavio Pompermaier  于2019年10月31日周四 下午4:53写道:
>>>
 Hi to all,
 yould it be possible to provide also the source code of the UI part of
 the ride sharing example? It would be interesting to me how the UI is
 reading the data from the Kafka egress.

 Best,
 Flavio

>>>
>>


RE: [DISCUSS] Semantic and implementation of per-job mode

2019-10-31 Thread Chan, Regina
Yeah just chiming in this conversation as well. We heavily use multiple job 
graphs to get isolation around retry logic and resource allocation across the 
job graphs. Putting all these parallel flows into a single graph would mean 
sharing of TaskManagers across what was meant to be truly independent.

We also build our job graphs dynamically based off of the state of the world at 
the start of the job. While we’ve had a share of the pain described, my 
understanding is that there would be a tradeoff in number of jobs being 
submitted to the cluster and corresponding resource allocation requests. In the 
model with multiple jobs in a program, there’s at least the opportunity to 
reuse idle taskmanagers.




From: Theo Diefenthal 
Sent: Thursday, October 31, 2019 10:56 AM
To: user@flink.apache.org
Subject: Re: [DISCUSS] Semantic and implementation of per-job mode

I agree with Gyula Fora,

In our case, we have a client-machine in the middle between our YARN cluster 
and some backend services, which can not be reached directly from the cluster 
nodes. On application startup, we connect to some external systems, get some 
information crucial for the job runtime and finally build up the job graph to 
be committed.

It is true that we could workaround this, but it would be pretty annoying to 
connect to the remote services, collect the data, upload it to HDFS, start the 
job and make sure, housekeeping of those files is also done at some later time.

The current behavior also corresponds to the behavior of Sparks driver mode, 
which made the transition from Spark to Flink easier for us.

But I see the point, especially in terms of Kubernetes and would thus also vote 
for an opt-in solution, being the client compilation the default and having an 
option for the per-program mode as well.

Best regards


Von: "Flavio Pompermaier" mailto:pomperma...@okkam.it>>
An: "Yang Wang" mailto:danrtsey...@gmail.com>>
CC: "tison" mailto:wander4...@gmail.com>>, "Newport, 
Billy" mailto:billy.newp...@gs.com>>, "Paul Lam" 
mailto:paullin3...@gmail.com>>, "SHI Xiaogang" 
mailto:shixiaoga...@gmail.com>>, "dev" 
mailto:d...@flink.apache.org>>, "user" 
mailto:user@flink.apache.org>>
Gesendet: Donnerstag, 31. Oktober 2019 10:45:36
Betreff: Re: [DISCUSS] Semantic and implementation of per-job mode

Hi all,
we're using a lot the multiple jobs in one program and this is why: when you 
fetch data from a huge number of sources and, for each source, you do some 
transformation and then you want to write into a single directory the union of 
all outputs (this assumes you're doing batch). When the number of sources is 
large, if you want to do this in a single job, the graph becomes very big and 
this is a problem for several reasons:

  *   too many substasks /threadsi per slot
  *   increase of back pressure
  *   if a single "sub-job" fails all the job fails..this is very annoying if 
this happens after a half a day for example
  *   In our use case, the big-graph mode takes much longer than running each 
job separately (but maybe this is true only if you don't have much hardware 
resources)
  *   debugging the cause of a fail could become a daunting task if the job 
graph is too large
  *   we faced may strange errors when trying to run the single big-job mode 
(due to serialization corruption)
So, summarizing our overall experience with Flink batch is: the easier is the 
job graph the better!

Best,
Flavio


On Thu, Oct 31, 2019 at 10:14 AM Yang Wang 
mailto:danrtsey...@gmail.com>> wrote:
Thanks for tison starting this exciting discussion. We also suffer a lot from 
the per job mode.
I think the per-job cluster is a dedicated cluster for only one job and will 
not accept more other
jobs. It has the advantage of one-step submission, do not need to start 
dispatcher first and
then submit the job. And it does not matter where the job graph is generated 
and job is submitted.
Now we have two cases.

(1) Current Yarn detached cluster. The job graph is generated in client and 
then use distributed
cache to flink master container. And the MiniDispatcher uses 
`FileJobGraphRetrieve` to get it.
The job will be submitted at flink master side.


(2) Standalone per job cluster. User jars are already built into image. So the 
job graph will be
generated at flink master side and `ClasspathJobGraphRetriver` is used to get 
it. The job will
also be submitted at flink master side.


For the (1) and (2), only one job in user program could be supported. The per 
job means
per job-graph, so it works just as expected.


Tison suggests to add a new mode "per-program”. The user jar will be 
transferred to flink master
container, and a local client will be started to generate job graph and submit 
job. I think it could
cover all the functionality of current per job, both (1) and (2). Also the 
detach mode and attach
mode could be unified. We do not need to start a session cluster to simulate 
per job for multiple parts.


I am i

Re: Stateful functions presentation code (UI part)

2019-10-31 Thread Igal Shilman
For that particular example, the simulator [1] is responsible for
simulating physical drivers and passengers that interact with their
corresponding
stateful functions [2].
The interaction between the simulator and the stateful functions is
happening via four Kafka topics:
* to-driver - messages that are sent from the physical drivers (the
simulator [1]) to the stateful functions. The messages always carry a
driver-id which acts as a routing key (I think that this is what you mean
by correlation id) to a specific driver stateful function (FnDriver)
* from-driver - messages that are sent from a stateful function with a
specific driver id to the simulator
* to-passenger - symmetric to to-driver
* from-passenger - symmetric to from-driver.
The ingress and egress definition are specified here [3], and you may want
to checkout how to router is defined as well [4][5].

In addition the simulator is also feeding the UI directly by duplicating
the messages to a web socket (see [6])

I hope this clarifies the examples.

Igal.

[1]
https://github.com/ververica/stateful-functions/tree/master/stateful-functions-examples/stateful-functions-ridesharing-example/stateful-functions-ridesharing-example-simulator/src/main/java/com/ververica/statefun/examples/ridesharing/simulator
[2]
https://github.com/ververica/stateful-functions/tree/master/stateful-functions-examples/stateful-functions-ridesharing-example/stateful-functions-ridesharing-example-functions
[3]
https://github.com/ververica/stateful-functions/blob/master/stateful-functions-examples/stateful-functions-ridesharing-example/stateful-functions-ridesharing-example-functions/src/main/java/com/ververica/statefun/examples/ridesharing/KafkaSpecs.java#L43
[4]
https://github.com/ververica/stateful-functions/blob/master/stateful-functions-examples/stateful-functions-ridesharing-example/stateful-functions-ridesharing-example-functions/src/main/java/com/ververica/statefun/examples/ridesharing/Module.java#L33
[5]
https://github.com/ververica/stateful-functions/blob/master/stateful-functions-examples/stateful-functions-ridesharing-example/stateful-functions-ridesharing-example-functions/src/main/java/com/ververica/statefun/examples/ridesharing/InboundDriverRouter.java#L26
[6]
https://github.com/ververica/stateful-functions/blob/master/stateful-functions-examples/stateful-functions-ridesharing-example/stateful-functions-ridesharing-example-simulator/src/main/java/com/ververica/statefun/examples/ridesharing/simulator/simulation/Driver.java#L70

On Thu, Oct 31, 2019 at 4:15 PM Flavio Pompermaier 
wrote:

> Yes, I'm interested in how to read data from a UI..which egress should I
> use? If we use a kafka queue, how to filter data received in the topic?
> Should I use a correlation id or use a new topic per user?
>
> Il Gio 31 Ott 2019, 16:08 Igal Shilman  ha scritto:
>
>> Hi Flavio,
>>
>> We haven't included the UI source code just yet, we've only used it for
>> demos and talks.
>>
>> The reason is that (1) we didn't put a lot of effort and time there (2)
>> didn't check the time to go through the individual dependencies and
>> licences.
>> But we will add that very soon.
>>
>> Would having the UI code there would improve your understanding? or is
>> there another reason?
>>
>> Thanks,
>> Igal
>>
>> On Thu, Oct 31, 2019 at 5:44 AM Flavio Pompermaier 
>> wrote:
>>
>>> Hi Vino,
>>> I already checked that code but I can't find the UI part :(
>>>
>>> On Thu, Oct 31, 2019 at 12:32 PM vino yang 
>>> wrote:
>>>
 Hi Flavio,

 Please see this link.[1]

 Best,
 Vino

 [1]:
 https://github.com/ververica/stateful-functions/tree/master/stateful-functions-examples/stateful-functions-ridesharing-example

 Flavio Pompermaier  于2019年10月31日周四 下午4:53写道:

> Hi to all,
> yould it be possible to provide also the source code of the UI part of
> the ride sharing example? It would be interesting to me how the UI is
> reading the data from the Kafka egress.
>
> Best,
> Flavio
>

>>>


Re: possible backwards compatibility issue between 1.8->1.9?

2019-10-31 Thread Piotr Nowojski
Hi,

It sounds strange. 

In the second example aren’t you just setting the “name” and “uid” for the last 
“map” transformation? While you would like to set it for `unorderedWait` and 
`filter` as well? I guess you can check this out in your application logs. Can 
you check what are the actual uids that are being set and used?

For the first example, I don’t know. Could it be an issue of Scala and some 
implicit magic/wrapping (since you are using JavaAsyncDataStream)? Maybe 
something adds some mapping operator to convert types? And the `uid`  is 
assigned not to the AsyncWaitOperator? 

Also to be honest, the exception message doesn’t look like the one I would be 
expecting to see. It looks more like some issue with the state migration. 
Gordon, could you take a look? You also might be a bit more familiar with some 
quirks of setting the `uid`.

Piotrek

> On 31 Oct 2019, at 17:51, Bekir Oguz  wrote:
> 
> Hi Piotr,
> 
> We missed this note from the release notes, but still surprised to hit this 
> bug in our implementation which conforms the workaround solution explained.
> The weird behaviour is, we use this async stream in 2 different jobs. And in 
> TEST environment operator chaining disabled, in PROD enabled.
> 
> Our first job looks like this:
> val asyncStream =
>   JavaAsyncDataStream
> .unorderedWait(stream.javaStream, new Enricher(config), 60, 
> TimeUnit.SECONDS)
> .startNewChain()
> .uid("Enricher_ID")
> startNewChain() already sets the ChainingStrategy to HEAD (similar to the 
> bugfix for FLINK-13063) and we assign a unique UID which was suggested as a 
> workaround. 
> 
> Our second job looks like this:
> AsyncDataStream
>   .unorderedWait(
> source,
> new EnricherFunction(config),
> 60,
> TimeUnit.SECONDS
>   )
>   .filter(new EnricherFilter)
>   .map(_.get)
>   .name("EnricherCall")
>   .uid("EnricherCall_ID")
> 
> In test environment, the first job could not restore from last checkpoint (we 
> started with no state to fix this), but the second job succeeded.
> In prod environment, failure and the solution was the same for the first job. 
> But then the second job failed to restore its state (different behaviour than 
> test). Since this job has also a user state in a KeyedProcessFunction, 
> starting without state was not an option for us. We just tried to restart it 
> with "operator chaining disabled", and then surprisingly it worked.
> 
> How can we explain this different behaviour of the second job in test and 
> prod? The only visible difference is the operator chaining config.
> 
> Thanks in advance,
> Bekir
> 
> 
> 
> On Thu, 31 Oct 2019 at 09:44, Piotr Nowojski  > wrote:
> Hi,
> 
> (This question is more appropriate for the user mailing list, not dev - when 
> responding to my e-mail please remove dev mailing list from the recipients, 
> I’ve kept it just FYI that discussion has moved to user mailing list).
> 
> Could it be, that the problem is caused by changes in chaining strategy of 
> the AsyncWaitOperator in 1.9, as explained in the release notes [1]?
> 
> > AsyncIO
> > Due to a bug in the AsyncWaitOperator, in 1.9.0 the default chaining 
> > behaviour of the operator is now changed so that it 
> > is never chained after another operator. This should not be problematic for 
> > migrating from older version snapshots as 
> > long as an uid was assigned to the operator. If an uid was not assigned to 
> > the operator, please see the instructions here [2]
> > for a possible workaround.
> >
> > Related issues:
> >
> > • FLINK-13063: AsyncWaitOperator shouldn’t be releasing 
> > checkpointingLock [3]
> 
> Piotrek
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-stable/release-notes/flink-1.9.html#asyncio
>  
> 
> [2] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/upgrading.html#matching-operator-state
>  
> 
> [3] https://issues.apache.org/jira/browse/FLINK-13063 
> 
> 
>> On 30 Oct 2019, at 16:52, Bekir Oguz > > wrote:
>> 
>> Hi guys,
>> during our upgrade from 1.8.1 to 1.9.1, one of our jobs fail to start with
>> the following exception. We deploy the job with 'allow-non-restored-state'
>> option and from the latest checkpoint dir of the 1.8.1 version.
>> 
>> org.apache.flink.util.StateMigrationException: The new state typeSerializer
>> for operator state must not be incompatible.
>>at org.apache.flink.runtime.state.DefaultOperatorStateBackend
>> .getListState(DefaultOperatorStateBackend.java:323)
>>at org.apache.flink.runtime.state.DefaultOperatorStateBackend
>> .getListState(DefaultOperatorStateBackend.java:214)
>>at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator
>> .initializeState(

Re: Stateful functions presentation code (UI part)

2019-10-31 Thread Flavio Pompermaier
Thanks Igal, this is more or less what I was expecting..this implies that
ALL events are received on the UI side.
I was concerned about the tradeoffs of this choice: when I zoom on the map
I could simply ignore messages outside the boundaries (but I still spend
many cpu resource in the reading of useless messages).

In the case of a worldwide company (eg uber or simolar) it's probably better
to create a topic per geographical arean..but also in this case then the UI
should know when to attach or detach from queue topics when it reach a
section of the map served by different topics..in another case I could just
have too many events also in the single map section (let's think about some
big city with many user).

Is there any talk at FF ir someone else that faced those issues too?

Il Gio 31 Ott 2019, 17:44 Igal Shilman  ha scritto:

> For that particular example, the simulator [1] is responsible for
> simulating physical drivers and passengers that interact with their
> corresponding
> stateful functions [2].
> The interaction between the simulator and the stateful functions is
> happening via four Kafka topics:
> * to-driver - messages that are sent from the physical drivers (the
> simulator [1]) to the stateful functions. The messages always carry a
> driver-id which acts as a routing key (I think that this is what you mean
> by correlation id) to a specific driver stateful function (FnDriver)
> * from-driver - messages that are sent from a stateful function with a
> specific driver id to the simulator
> * to-passenger - symmetric to to-driver
> * from-passenger - symmetric to from-driver.
> The ingress and egress definition are specified here [3], and you may want
> to checkout how to router is defined as well [4][5].
>
> In addition the simulator is also feeding the UI directly by duplicating
> the messages to a web socket (see [6])
>
> I hope this clarifies the examples.
>
> Igal.
>
> [1]
> https://github.com/ververica/stateful-functions/tree/master/stateful-functions-examples/stateful-functions-ridesharing-example/stateful-functions-ridesharing-example-simulator/src/main/java/com/ververica/statefun/examples/ridesharing/simulator
> [2]
> https://github.com/ververica/stateful-functions/tree/master/stateful-functions-examples/stateful-functions-ridesharing-example/stateful-functions-ridesharing-example-functions
> [3]
> https://github.com/ververica/stateful-functions/blob/master/stateful-functions-examples/stateful-functions-ridesharing-example/stateful-functions-ridesharing-example-functions/src/main/java/com/ververica/statefun/examples/ridesharing/KafkaSpecs.java#L43
> [4]
> https://github.com/ververica/stateful-functions/blob/master/stateful-functions-examples/stateful-functions-ridesharing-example/stateful-functions-ridesharing-example-functions/src/main/java/com/ververica/statefun/examples/ridesharing/Module.java#L33
> [5]
> https://github.com/ververica/stateful-functions/blob/master/stateful-functions-examples/stateful-functions-ridesharing-example/stateful-functions-ridesharing-example-functions/src/main/java/com/ververica/statefun/examples/ridesharing/InboundDriverRouter.java#L26
> [6]
> https://github.com/ververica/stateful-functions/blob/master/stateful-functions-examples/stateful-functions-ridesharing-example/stateful-functions-ridesharing-example-simulator/src/main/java/com/ververica/statefun/examples/ridesharing/simulator/simulation/Driver.java#L70
>
> On Thu, Oct 31, 2019 at 4:15 PM Flavio Pompermaier 
> wrote:
>
>> Yes, I'm interested in how to read data from a UI..which egress should I
>> use? If we use a kafka queue, how to filter data received in the topic?
>> Should I use a correlation id or use a new topic per user?
>>
>> Il Gio 31 Ott 2019, 16:08 Igal Shilman  ha scritto:
>>
>>> Hi Flavio,
>>>
>>> We haven't included the UI source code just yet, we've only used it for
>>> demos and talks.
>>>
>>> The reason is that (1) we didn't put a lot of effort and time there (2)
>>> didn't check the time to go through the individual dependencies and
>>> licences.
>>> But we will add that very soon.
>>>
>>> Would having the UI code there would improve your understanding? or is
>>> there another reason?
>>>
>>> Thanks,
>>> Igal
>>>
>>> On Thu, Oct 31, 2019 at 5:44 AM Flavio Pompermaier 
>>> wrote:
>>>
 Hi Vino,
 I already checked that code but I can't find the UI part :(

 On Thu, Oct 31, 2019 at 12:32 PM vino yang 
 wrote:

> Hi Flavio,
>
> Please see this link.[1]
>
> Best,
> Vino
>
> [1]:
> https://github.com/ververica/stateful-functions/tree/master/stateful-functions-examples/stateful-functions-ridesharing-example
>
> Flavio Pompermaier  于2019年10月31日周四 下午4:53写道:
>
>> Hi to all,
>> yould it be possible to provide also the source code of the UI part
>> of the ride sharing example? It would be interesting to me how the UI is
>> reading the data from the Kafka egress.
>>
>> Best,
>> Flavio
>>
>>

Re: How to stream intermediate data that is stored in external storage?

2019-10-31 Thread kant kodali
Hi Huyen,

That is not my problem statement.  If it is just ingesting from A to B I am
sure there are enough tutorials for me to get it done. I also feel like the
more I elaborate the more confusing it gets and I am not sure why.

I want to join two streams and I want to see/query the results of that join
in real-time but I also have some constraints.

I come from Spark world and in Spark, if I do an inner join of two streams
A & B  then I can see results only when there are matching rows between A &
B (By definition of inner join this makes sense) but if I do an outer join
of two streams A & B I need to specify the time constraint and only when
the time elapses fully I can see the rows that did not match. This means if
my time constraint is one hour I cannot query the intermediate results
until one hour and this is not the behavior I want. I want to be able to do
all sorts of SQL queries on intermediate results.

I like Flink's idea of externalizing the state that way I don't have to
worry about the memory but I am also trying to avoid writing a separate
microservice that needs to poll and display the intermediate results of the
join in real-time. Instead, I am trying to see if there is a way to treat
that constantly evolving intermediate results as a streaming source, and
maybe do some more transformations and push out to another sink.

Hope that makes sense.

Thanks,
Kant



On Thu, Oct 31, 2019 at 2:43 AM Huyen Levan  wrote:

> Hi Kant,
>
> So your problem statement is "ingest 2 streams into a data warehouse". The
> main component of the solution, from my view, is that SQL server. You can
> have a sink function to insert records in your two streams into two
> different tables (A and B), or upsert into one single table C. That upsert
> action itself serves as a join function, there's no need to join in Flink
> at all.
>
> There are many tools out there can be used for that ingestion. Flink, of
> course, can be used for that purpose. But for me, it's an overkill.
>
> Regards,
> Averell
>
> On Thu, 31 Oct. 2019, 8:19 pm kant kodali,  wrote:
>
>> Hi Averell,
>>
>> yes,  I want to run ad-hoc SQL queries on the joined data as well as data
>> that may join in the future. For example, let's say if you take datasets A
>> and B in streaming mode a row in A can join with a row B in some time in
>> future let's say but meanwhile if I query the intermediate state using SQL
>> I want the row in A that have not yet joined with B to also be available to
>> Query. so not just joined results alone but also data that might be join in
>> the future as well since its all streaming.
>>
>> Thanks!
>>
>


Re: No FileSystem for scheme "file" for S3A in and state processor api in 1.9

2019-10-31 Thread spoganshev
The problem happens in batch jobs (the ones that use ExecutionEnvironment)
that use state processor api for bootstrapping initial savepoint for
streaming job. 

We are building a single docker image for streaming and batch versions of
the job. In that image we put both presto (which we use for checkpoints in
streaming job) and hadoop to separate plugin folders. When we run a batch
job using this image the aforementioned exception happens.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Preserving (best effort) messages order between operators

2019-10-31 Thread Yun Gao
 Hi Averell,

   If I understood right, the job graph is A (parallelism = 1) --> B 
(parallelism > 1), then I think the records sending into the subtask B_i should 
be the same as the order sending out from A. Therefore, could you also provide 
more details on the topology ? Is there only the two operators? And could you 
also provide how the message order is checked in B_i ? 

   Best,
   Yun


--
From:Averell 
Send Time:2019 Oct. 31 (Thu.) 12:55
To:user 
Subject:Preserving (best effort) messages order between operators

Hi, 

I have a source function with parallelism = 1, sending out records ordered
by event-time. These records are then re-balanced to the next operator which
has parallelism > 1. I observed that within each subtask of the 2nd
operator, the order of the messages is not maintained. Is this behaviour
expected? If it is, is there any way to avoid that? Or at least reduce that?
I have high back-pressure on that 2nd operator as the one after that is
slow. There is also high back-pressure on the 1st operator, which makes my
problem more severe (the mentioned out-of-order is high). If I could
throttle the 1st operator when back-pressure is high, then I could mitigate
the mentioned problem. But I could not find any guide on doing that.

Could you please help?

Thanks.
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: [DISCUSS] Semantic and implementation of per-job mode

2019-10-31 Thread Rong Rong
Hi All,

Thanks @Tison for starting the discussion and I think we have very similar
scenario with Theo's use cases.
In our case we also generates the job graph using a client service (which
serves multiple job graph generation from multiple user code) and we've
found that managing the upload/download between the cluster and the DFS to
be trick and error-prone. In addition, the management of different
environment and requirement from different user in a single service posts
even more trouble for us.

However, shifting the job graph generation towards the cluster side also
requires some thoughts regarding how to manage the driver-job as well as
some dependencies conflicts - In the case for shipping the job graph
generation to the cluster, some unnecessary dependencies for the runtime
will be pulled in by the driver-job (correct me if I were wrong Theo)

I think in general I agree with @Gyula's main point: unless there is a very
strong reason, it is better if we put the driver-mode as an opt-in (at
least at the beginning).
I left some comments on the document as well. Please kindly take a look.

Thanks,
Rong

On Thu, Oct 31, 2019 at 9:26 AM Chan, Regina  wrote:

> Yeah just chiming in this conversation as well. We heavily use multiple
> job graphs to get isolation around retry logic and resource allocation
> across the job graphs. Putting all these parallel flows into a single graph
> would mean sharing of TaskManagers across what was meant to be truly
> independent.
>
>
>
> We also build our job graphs dynamically based off of the state of the
> world at the start of the job. While we’ve had a share of the pain
> described, my understanding is that there would be a tradeoff in number of
> jobs being submitted to the cluster and corresponding resource allocation
> requests. In the model with multiple jobs in a program, there’s at least
> the opportunity to reuse idle taskmanagers.
>
>
>
>
>
>
>
>
>
> *From:* Theo Diefenthal 
> *Sent:* Thursday, October 31, 2019 10:56 AM
> *To:* user@flink.apache.org
> *Subject:* Re: [DISCUSS] Semantic and implementation of per-job mode
>
>
>
> I agree with Gyula Fora,
>
>
>
> In our case, we have a client-machine in the middle between our YARN
> cluster and some backend services, which can not be reached directly from
> the cluster nodes. On application startup, we connect to some external
> systems, get some information crucial for the job runtime and finally build
> up the job graph to be committed.
>
>
>
> It is true that we could workaround this, but it would be pretty annoying
> to connect to the remote services, collect the data, upload it to HDFS,
> start the job and make sure, housekeeping of those files is also done at
> some later time.
>
>
>
> The current behavior also corresponds to the behavior of Sparks driver
> mode, which made the transition from Spark to Flink easier for us.
>
>
>
> But I see the point, especially in terms of Kubernetes and would thus also
> vote for an opt-in solution, being the client compilation the default and
> having an option for the per-program mode as well.
>
>
>
> Best regards
>
>
> --
>
> *Von: *"Flavio Pompermaier" 
> *An: *"Yang Wang" 
> *CC: *"tison" , "Newport, Billy" <
> billy.newp...@gs.com>, "Paul Lam" , "SHI Xiaogang"
> , "dev" , "user" <
> user@flink.apache.org>
> *Gesendet: *Donnerstag, 31. Oktober 2019 10:45:36
> *Betreff: *Re: [DISCUSS] Semantic and implementation of per-job mode
>
>
>
> Hi all,
>
> we're using a lot the multiple jobs in one program and this is why: when
> you fetch data from a huge number of sources and, for each source, you do
> some transformation and then you want to write into a single directory the
> union of all outputs (this assumes you're doing batch). When the number of
> sources is large, if you want to do this in a single job, the graph becomes
> very big and this is a problem for several reasons:
>
>- too many substasks /threadsi per slot
>- increase of back pressure
>- if a single "sub-job" fails all the job fails..this is very annoying
>if this happens after a half a day for example
>- In our use case, the big-graph mode takes much longer than running
>each job separately (but maybe this is true only if you don't have much
>hardware resources)
>- debugging the cause of a fail could become a daunting task if the
>job graph is too large
>- we faced may strange errors when trying to run the single big-job
>mode (due to serialization corruption)
>
> So, summarizing our overall experience with Flink batch is: the easier is
> the job graph the better!
>
>
>
> Best,
>
> Flavio
>
>
>
>
>
> On Thu, Oct 31, 2019 at 10:14 AM Yang Wang  wrote:
>
> Thanks for tison starting this exciting discussion. We also suffer a lot
> from the per job mode.
>
> I think the per-job cluster is a dedicated cluster for only one job and
> will not accept more other
>
> jobs. It has the advantage of one-step submission, do not need to start
> dispat

Re: [DISCUSS] Semantic and implementation of per-job mode

2019-10-31 Thread bupt_ljy
Hi all,


Firstly thanks @tison for bring this up and strongly +1 for the overall design. 


I’d like to add one more example of "multiple jobs in one program" with what 
I’m currently working on. I’m trying to run a TPC-DS benchmark testing 
(including tens of sql query job) on Flink and sufferring a lot from 
maintaining the client because I can’t run this program in per-job mode and 
have to make the client attached. 


Back to our discussion, I can see now there is a divergence of compiling the 
job graph between in client and in #ClusterEntrypoint. And up and downsides 
exist in either way. As for the opt-in solution, I have a question, what if the 
user chooses detach mode, compiling in the client and runs a multi-job program 
at the same time? And it still not gonna work.
Besides, by adding an compiling option, we need to consider more things when 
submitting a job like "Is my program including multiple job?" or "Does the 
program need to be initialized before submitting to a remote cluster?", which 
looks a bit complicated and confusing to me.


By summarizing, I'll vote for the per-program new concept but I may not prefer 
the opt-in option mentioned in the mailing list or maybe we need to reconsider 
a better concept and definition which is easy to understand.




Best,
Jiayi Liao


 Original Message 
Sender: Rong Rong
Recipient: Regina" 
Cc: Theo Diefenthal; 
user@flink.apache.org
Date: Friday, Nov 1, 2019 11:01
Subject: Re: [DISCUSS] Semantic and implementation of per-job mode


Hi All,


Thanks @Tison for starting the discussion and I think we have very similar 
scenario with Theo's use cases. 
In our case we also generates the job graph using a client service (which 
serves multiple job graph generation from multiple user code) and we've found 
that managing the upload/download between the cluster and the DFS to be trick 
and error-prone. In addition, the management of different environment and 
requirement from different user in a single service posts even more trouble for 
us.


However, shifting the job graph generation towards the cluster side also 
requires some thoughts regarding how to manage the driver-job as well as some 
dependencies conflicts - In the case for shipping the job graph generation to 
the cluster, some unnecessary dependencies for the runtime will be pulled in by 
the driver-job (correct me if I were wrong Theo)



I think in general I agree with @Gyula's main point: unless there is a very 
strong reason, it is better if we put the driver-mode as an opt-in (at least at 
the beginning). 

I left some comments on the document as well. Please kindly take a look.


Thanks,
Rong


On Thu, Oct 31, 2019 at 9:26 AM Chan, Regina  wrote:

Yeah just chiming in this conversation as well. We heavily use multiple job 
graphs to get isolation around retry logic and resource allocation across the 
job graphs. Putting all these parallel flows into a single graph would mean 
sharing of TaskManagers across what was meant to be truly independent.
 
We also build our job graphs dynamically based off of the state of the world at 
the start of the job. While we’ve had a share of the pain described, my 
understanding is that there would be a tradeoff in number of jobs being 
submitted to the cluster and corresponding resource allocation requests. In the 
model with multiple jobs in a program, there’s at least the opportunity to 
reuse idle taskmanagers. 
 
 
 
 
From: Theo Diefenthal  
 Sent: Thursday, October 31, 2019 10:56 AM
 To: user@flink.apache.org
 Subject: Re: [DISCUSS] Semantic and implementation of per-job mode
 
I agree with Gyula Fora,
 
In our case, we have a client-machine in the middle between our YARN cluster 
and some backend services, which can not be reached directly from the cluster 
nodes. On application startup, we connect to some external systems, get some 
information crucial for the job runtime and finally build up the job graph to 
be committed.
 
It is true that we could workaround this, but it would be pretty annoying to 
connect to the remote services, collect the data, upload it to HDFS, start the 
job and make sure, housekeeping of those files is also done at some later time. 
 
The current behavior also corresponds to the behavior of Sparks driver mode, 
which made the transition from Spark to Flink easier for us. 
 
But I see the point, especially in terms of Kubernetes and would thus also vote 
for an opt-in solution, being the client compilation the default and having an 
option for the per-program mode as well.
 
Best regards
 
Von: "Flavio Pompermaier" 
 An: "Yang Wang" 
 CC: "tison" , "Newport, Billy" , 
"Paul Lam" , "SHI Xiaogang" , 
"dev" , "user" 
 Gesendet: Donnerstag, 31. Oktober 2019 10:45:36
 Betreff: Re: [DISCUSS] Semantic and implementation of per-job mode
 
Hi all, 
we're using a lot the multiple jobs in one program and this is why: when you 
fetch data from a huge number of sources and, for each source, you do some 
transformation and then you 

How to write type information for a Java Set and List inside a Tuple?

2019-10-31 Thread Komal Mariam
Hi all,

I'm trying to create a MapState, Set,
List>> for KeyedBroadcastProcessFunction but I'm not sure how to
initialize its MapStateDescriptor.

I have written it in two ways as given below and my IDE isn't showing an
error either way (haven't tested on runtime yet).

I'd really appreciate if anyone can tell me which way is correct and if not
what's the best way to give Type Hints for  Tuple3,
Set, List>

myClass is POJO type.

Code Snippet 1:

private final MapStateDescriptor,
Set, List>> outStateDesc =
new MapStateDescriptor<>(
"neighbours",
BasicTypeInfo.INT_TYPE_INFO,
new TupleTypeInfo<>(TypeInformation.of(new
TypeHint() {}),
TypeInformation.of(new TypeHint() {}),
new ListTypeInfo<>(myClass.class)));



Code Snippet 2:
private final MapStateDescriptor,
Set, List>> outStateDesc =
new MapStateDescriptor<>(
"neighbours",
BasicTypeInfo.INT_TYPE_INFO,
TypeInformation.of(new TypeHint,
Set, List>>(){}));


Best Regards,
Komal


Re: How to write type information for a Java Set and List inside a Tuple?

2019-10-31 Thread Jingsong Lee
Hi Komal:
I think snippet 1 is better, because it carry more information like
ListTypeInfo.
Consider snippet 2, now our type inference in TypeInformation.of can not
infer the nested information. (It not get the information: List)

On Fri, Nov 1, 2019 at 11:49 AM Jingsong Li  wrote:

> Hi Komal:
> I think snippet 1 is better, because it carry more information like
> ListTypeInfo.
> Consider snippet 2, now our type inference in TypeInformation.of can not
> infer the nested information. (It not get the information: List)
>
> On Fri, Nov 1, 2019 at 11:34 AM Komal Mariam 
> wrote:
>
>> Hi all,
>>
>> I'm trying to create a MapState,
>> Set, List>> for KeyedBroadcastProcessFunction but I'm
>> not sure how to initialize its MapStateDescriptor.
>>
>> I have written it in two ways as given below and my IDE isn't showing an
>> error either way (haven't tested on runtime yet).
>>
>> I'd really appreciate if anyone can tell me which way is correct and if
>> not what's the best way to give Type Hints for  Tuple3,
>> Set, List>
>>
>> myClass is POJO type.
>>
>> Code Snippet 1:
>>
>> private final MapStateDescriptor,
>> Set, List>> outStateDesc =
>> new MapStateDescriptor<>(
>> "neighbours",
>> BasicTypeInfo.INT_TYPE_INFO,
>> new TupleTypeInfo<>(TypeInformation.of(new
>> TypeHint() {}),
>> TypeInformation.of(new TypeHint()
>> {}),
>> new ListTypeInfo<>(myClass.class)));
>>
>>
>>
>> Code Snippet 2:
>> private final MapStateDescriptor,
>> Set, List>> outStateDesc =
>> new MapStateDescriptor<>(
>> "neighbours",
>> BasicTypeInfo.INT_TYPE_INFO,
>> TypeInformation.of(new
>> TypeHint, Set, List>>(){}));
>>
>>
>> Best Regards,
>> Komal
>>
>
>
> --
> Best, Jingsong Lee
>


-- 
Best, Jingsong Lee


Re: Preserving (best effort) messages order between operators

2019-10-31 Thread Huyen Levan
Hi Yun,

My job graph is: (A: 1) -(rebalance)-> (B: 32) -(hash)-> (C: 32). A lists
files, forwards to B as FileInputSlits. B parses those files and shuffles
the data records to C as keyed streams.
C is the slowest in the graph, A is the fastest.

I relied on the slf4j/logback logs to derive that conclusion. There's one
log entry for each context.collect() call of A, and there's one log entry
whenever B open a new FileInputSplits (B is Flink's
ContinuousFileReaderOperator).
My logback configuration is: %d{-MM-dd
HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} -
%msg%n

The logs I got from A showed messages in order (by *dt *in my case).
However, the logs I got from B showed that messages' order was lost (please
refer to the logs below). I suppose that each logback %thread corresponding
exactly one B_i.

Thanks and regards,
Averell

























*2019-10-30 05:30:43.548 [Thread-34] INFO  com.myco.myFIF  - Opening file
/dt=2017-12-12/part-00119-2dd7fe37-5e1b-4bc7-8bc4-fc632b419ac02019-10-30
05:30:51.239 [Thread-34] INFO  com.myco.myFIF  - Opening file
/dt=2017-12-13/part-1-3ee818c2-c543-4744-957b-7fd0391e01432019-10-30
05:31:06.537 [Thread-34] INFO  com.myco.myFIF  - Opening file
/dt=2017-12-13/part-00083-3ee818c2-c543-4744-957b-7fd0391e01432019-10-30
05:31:13.611 [Thread-34] INFO  com.myco.myFIF  - Opening file
/dt=2017-12-13/part-00159-3ee818c2-c543-4744-957b-7fd0391e01432019-10-30
05:31:20.826 [Thread-34] INFO  com.myco.myFIF  - Opening file
/dt=2017-12-14/part-00041-c4b2a37e-066d-4adb-b610-a714e7b45b8b2019-10-30
05:31:28.487 [Thread-34] INFO  com.myco.myFIF  - Opening file
/dt=2017-12-14/part-00121-c4b2a37e-066d-4adb-b610-a714e7b45b8b2019-10-30
05:31:35.806 [Thread-34] INFO  com.myco.myFIF  - Opening file
/dt=2017-12-15/part-1-3830100b-611e-455d-b6f9-9bce78ca51392019-10-30
05:31:42.739 [Thread-34] INFO  com.myco.myFIF  - Opening file
/dt=2017-12-15/part-00081-3830100b-611e-455d-b6f9-9bce78ca51392019-10-30
05:31:49.861 [Thread-34] INFO  com.myco.myFIF  - Opening file
/dt=2018-01-01/part-00045-1dc6388b-b72c-4bcd-a337-35c371b583f62019-10-30
05:31:55.834 [Thread-34] INFO  com.myco.myFIF  - Opening file
/dt=2018-01-01/part-00130-1dc6388b-b72c-4bcd-a337-35c371b583f62019-10-30
05:32:02.097 [Thread-34] INFO  com.myco.myFIF  - Opening file
/dt=2017-12-15/part-00161-3830100b-611e-455d-b6f9-9bce78ca51392019-10-30
05:32:06.452 [Thread-34] INFO  com.myco.myFIF  - Opening file
/dt=2018-01-02/part-0-4ef9a43f-d0de-412c-9a3f-01f990cee55f2019-10-30
05:32:11.379 [Thread-34] INFO  com.myco.myFIF  - Opening file
/dt=2018-01-02/part-00077-4ef9a43f-d0de-412c-9a3f-01f990cee55f2019-10-30
05:32:16.103 [Thread-34] INFO  com.myco.myFIF  - Opening file
/dt=2018-01-02/part-00147-4ef9a43f-d0de-412c-9a3f-01f990cee55f2019-10-30
05:32:21.025 [Thread-34] INFO  com.myco.myFIF  - Opening file
/dt=2017-12-16/part-00039-d12ed910-d58b-46b2-b607-784ebf1266d42019-10-30
05:32:25.758 [Thread-34] INFO  com.myco.myFIF  - Opening file
/dt=2018-01-03/part-00043-92a58007-0c35-479b-b9e5-6663fae4e71c2019-10-30
05:32:30.156 [Thread-34] INFO  com.myco.myFIF  - Opening file
/dt=2018-01-03/part-00123-92a58007-0c35-479b-b9e5-6663fae4e71c2019-10-30
05:32:34.169 [Thread-34] INFO  com.myco.myFIF  - Opening file
/dt=2017-12-16/part-00121-d12ed910-d58b-46b2-b607-784ebf1266d42019-10-30
05:32:39.462 [Thread-34] INFO  com.myco.myFIF  - Opening file
/dt=2018-01-04/part-1-413d1982-21b8-4bfb-828e-8014c9dfdb162019-10-30
05:32:43.551 [Thread-34] INFO  com.myco.myFIF  - Opening file
/dt=2018-01-04/part-00085-413d1982-21b8-4bfb-828e-8014c9dfdb162019-10-30
05:32:48.100 [Thread-34] INFO  com.myco.myFIF  - Opening file
/dt=2018-01-04/part-00166-413d1982-21b8-4bfb-828e-8014c9dfdb162019-10-30
05:32:52.629 [Thread-34] INFO  com.myco.myFIF  - Opening file
/dt=2017-12-17/part-1-491d8c85-7eb2-48c7-af06-501934f65a832019-10-30
05:32:57.834 [Thread-34] INFO  com.myco.myFIF  - Opening file
/dt=2018-01-05/part-00045-19080414-962a-455c-b342-fcf3e36f1cc52019-10-30
05:33:01.943 [Thread-34] INFO  com.myco.myFIF  - Opening file
/dt=2018-01-05/part-00113-19080414-962a-455c-b342-fcf3e36f1cc52019-10-30
05:33:06.871 [Thread-34] INFO  com.myco.myFIF  - Opening file
/dt=2017-12-17/part-00082-491d8c85-7eb2-48c7-af06-501934f65a83*

On Fri, Nov 1, 2019 at 1:32 PM Yun Gao  wrote:

>  Hi Averell,
>
>If I understood right, the job graph is A (parallelism = 1) -->
> B (parallelism > 1), then I think the records sending into the subtask B_i
> should be the same as the order sending out from A. Therefore, could you
> also provide more details on the topology ? Is there only the two
> operators? And could you also provide how the message order is checked in
> B_i ?
>
>Best,
>Yun
>
> --
> From:Averell 
> Send Time:2019 Oct. 31 (Thu.) 12:55
> To:user 
> Subject:Preserving (best effort) messages order between operators
>
> Hi,
>
> I have a source function with paral

Re: [DISCUSS] Semantic and implementation of per-job mode

2019-10-31 Thread Peter Huang
Hi Tison and Community,

Thanks for bringing it up. Actually, we meet a similar bottleneck of using
per cluster mode. Our team built a service for deploying and operating
Flink jobs.
The service sits in front of yarn clusters. To submit different job jars,
we need to download client jar into the service and generate a job
graph which is time-consuming.
Thus, we find an idea of Delayed Job Graph to make the job graph generation
in ClusterEntryPoint rather than on the client-side. Compare to your
proposal, it is more lightweight,
 and it is an option for existing per job mode. But it is not a solution
for handling multiple job graph within a program.

I am looking forward to more comments on the proposal, and also definitely
cooperation on this effort.
I hope both of our pain points can be resolved and contribute back to the
community.


https://docs.google.com/document/d/1aAwVjdZByA-0CHbgv16Me-vjaaDMCfhX7TzVVTuifYM/edit?ts=5da1f4d7#heading=h.be92q3uiam4t


Best Regards
Peter Huang

















On Thu, Oct 31, 2019 at 8:17 PM bupt_ljy  wrote:

> Hi all,
>
>
> Firstly thanks @tison for bring this up and strongly +1 for the overall
> design.
>
>
> I’d like to add one more example of "multiple jobs in one program" with
> what I’m currently working on. I’m trying to run a TPC-DS benchmark testing
> (including tens of sql query job) on Flink and sufferring a lot from
> maintaining the client because I can’t run this program in per-job mode and
> have to make the client attached.
>
>
> Back to our discussion, I can see now there is a divergence of compiling
> the job graph between in client and in #ClusterEntrypoint. And up and
> downsides exist in either way. As for the opt-in solution, I have a
> question, what if the user chooses detach mode, compiling in the client and
> runs a multi-job program at the same time? And it still not gonna work.
>
> Besides, by adding an compiling option, we need to consider more things
> when submitting a job like "Is my program including multiple job?" or "Does
> the program need to be initialized before submitting to a remote cluster?",
> which looks a bit complicated and confusing to me.
>
>
> By summarizing, I'll vote for the per-program new concept but I may not
> prefer the opt-in option mentioned in the mailing list or maybe we need to
> reconsider a better concept and definition which is easy to understand.
>
>
>
> Best,
>
> Jiayi Liao
>
>  Original Message
> *Sender:* Rong Rong
> *Recipient:* Regina" 
> *Cc:* Theo Diefenthal;
> user@flink.apache.org
> *Date:* Friday, Nov 1, 2019 11:01
> *Subject:* Re: [DISCUSS] Semantic and implementation of per-job mode
>
> Hi All,
>
> Thanks @Tison for starting the discussion and I think we have very similar
> scenario with Theo's use cases.
> In our case we also generates the job graph using a client service (which
> serves multiple job graph generation from multiple user code) and we've
> found that managing the upload/download between the cluster and the DFS to
> be trick and error-prone. In addition, the management of different
> environment and requirement from different user in a single service posts
> even more trouble for us.
>
> However, shifting the job graph generation towards the cluster side also
> requires some thoughts regarding how to manage the driver-job as well as
> some dependencies conflicts - In the case for shipping the job graph
> generation to the cluster, some unnecessary dependencies for the runtime
> will be pulled in by the driver-job (correct me if I were wrong Theo)
>
> I think in general I agree with @Gyula's main point: unless there is a
> very strong reason, it is better if we put the driver-mode as an opt-in (at
> least at the beginning).
> I left some comments on the document as well. Please kindly take a look.
>
> Thanks,
> Rong
>
> On Thu, Oct 31, 2019 at 9:26 AM Chan, Regina  wrote:
>
>> Yeah just chiming in this conversation as well. We heavily use multiple
>> job graphs to get isolation around retry logic and resource allocation
>> across the job graphs. Putting all these parallel flows into a single graph
>> would mean sharing of TaskManagers across what was meant to be truly
>> independent.
>>
>>
>>
>> We also build our job graphs dynamically based off of the state of the
>> world at the start of the job. While we’ve had a share of the pain
>> described, my understanding is that there would be a tradeoff in number of
>> jobs being submitted to the cluster and corresponding resource allocation
>> requests. In the model with multiple jobs in a program, there’s at least
>> the opportunity to reuse idle taskmanagers.
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *From:* Theo Diefenthal 
>> *Sent:* Thursday, October 31, 2019 10:56 AM
>> *To:* user@flink.apache.org
>> *Subject:* Re: [DISCUSS] Semantic and implementation of per-job mode
>>
>>
>>
>> I agree with Gyula Fora,
>>
>>
>>
>> In our case, we have a client-machine in the middle between our YARN
>> cluster and some backend services, which can not be reach

Re: low performance in running queries

2019-10-31 Thread Zhenghua Gao
2019-10-30 15:59:52,122 INFO
org.apache.flink.runtime.taskmanager.Task - Split
Reader: Custom File Source -> Flat Map (1/1)
(6a17c410c3e36f524bb774d2dffed4a4) switched from DEPLOYING to RUNNING.

2019-10-30 17:45:10,943 INFO
org.apache.flink.runtime.taskmanager.Task - Split
Reader: Custom File Source -> Flat Map (1/1)
(6a17c410c3e36f524bb774d2dffed4a4) switched from RUNNING to FINISHED.


It's surprise that the source task uses 95 mins to read a 2G file.

Could you give me your code snippets and some sample lines of the 2G file?

I will try to reproduce your scenario and dig the root causes.


*Best Regards,*
*Zhenghua Gao*


On Thu, Oct 31, 2019 at 9:05 PM Habib Mostafaei 
wrote:

> I enclosed all logs from the run and for this run I used parallelism one.
> However, for other runs I checked and found that all parallel workers were
> working properly. Is there a simple way to get profiling information in
> Flink?
>
> Best,
>
> Habib
> On 10/31/2019 2:54 AM, Zhenghua Gao wrote:
>
> I think more runtime information would help figure out where the problem
>  is.
> 1) how many parallelisms actually working
> 2) the metrics for each operator
> 3) the jvm profiling information, etc
>
> *Best Regards,*
> *Zhenghua Gao*
>
>
> On Wed, Oct 30, 2019 at 8:25 PM Habib Mostafaei 
> wrote:
>
>> Thanks Gao for the reply. I used the parallelism parameter with different
>> values like 6 and 8 but still the execution time is not comparable with a
>> single threaded python script. What would be the reasonable value for the
>> parallelism?
>>
>> Best,
>>
>> Habib
>> On 10/30/2019 1:17 PM, Zhenghua Gao wrote:
>>
>> The reason might be the parallelism of your task is only 1, that's too
>> low.
>> See [1] to specify proper parallelism  for your job, and the execution
>> time should be reduced significantly.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/parallel.html
>>
>> *Best Regards,*
>> *Zhenghua Gao*
>>
>>
>> On Tue, Oct 29, 2019 at 9:27 PM Habib Mostafaei 
>> wrote:
>>
>>> Hi all,
>>>
>>> I am running Flink on a standalone cluster and getting very long
>>> execution time for the streaming queries like WordCount for a fixed text
>>> file. My VM runs on a Debian 10 with 16 cpu cores and 32GB of RAM. I
>>> have a text file with size of 2GB. When I run the Flink on a standalone
>>> cluster, i.e., one JobManager and one taskManager with 25GB of heapsize,
>>> it took around two hours to finish counting this file while a simple
>>> python script can do it in around 7 minutes. Just wondering what is
>>> wrong with my setup. I ran the experiments on a cluster with six
>>> taskManagers, but I still get very long execution time like 25 minutes
>>> or so. I tried to increase the JVM heap size to have lower execution
>>> time but it did not help. I attached the log file and the Flink
>>> configuration file to this email.
>>>
>>> Best,
>>>
>>> Habib
>>>
>>>
>


Re: [DISCUSS] Semantic and implementation of per-job mode

2019-10-31 Thread tison
Hi all,

Thanks for your participation! First of all I have to clarify two confusion
in this thread.

1. The proposed "pre-program" mode is definitely a new mode opt-in. It is
described in
"Compatibility" section of the original email.

2. The documentation linked in the original email "Flink driver" is NOT the
proposed design.
See also the original paragraph below I'm sorry for link that in the first
email which causes
further confusion.

>In order to provide a "per-program" semantic mode which acts "just like"
session
>mode but with a dedicated cluster, I propose a workflow as below. It acts
like
>starting a drive on cluster but is NOT a general driver solution as
proposed
>here[3], the main purpose of the workflow below is for providing a
"per-program"
>semantic mode.

I'm reading detailedly your ideas and writing reply now :-)

Best,
tison.


Peter Huang  于2019年11月1日周五 下午12:47写道:

> Hi Tison and Community,
>
> Thanks for bringing it up. Actually, we meet a similar bottleneck of using
> per cluster mode. Our team built a service for deploying and operating
> Flink jobs.
> The service sits in front of yarn clusters. To submit different job jars,
> we need to download client jar into the service and generate a job
> graph which is time-consuming.
> Thus, we find an idea of Delayed Job Graph to make the job graph
> generation in ClusterEntryPoint rather than on the client-side. Compare to
> your proposal, it is more lightweight,
>  and it is an option for existing per job mode. But it is not a solution
> for handling multiple job graph within a program.
>
> I am looking forward to more comments on the proposal, and also definitely
> cooperation on this effort.
> I hope both of our pain points can be resolved and contribute back to the
> community.
>
>
>
> https://docs.google.com/document/d/1aAwVjdZByA-0CHbgv16Me-vjaaDMCfhX7TzVVTuifYM/edit?ts=5da1f4d7#heading=h.be92q3uiam4t
>
>
> Best Regards
> Peter Huang
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> On Thu, Oct 31, 2019 at 8:17 PM bupt_ljy  wrote:
>
>> Hi all,
>>
>>
>> Firstly thanks @tison for bring this up and strongly +1 for the overall
>> design.
>>
>>
>> I’d like to add one more example of "multiple jobs in one program" with
>> what I’m currently working on. I’m trying to run a TPC-DS benchmark testing
>> (including tens of sql query job) on Flink and sufferring a lot from
>> maintaining the client because I can’t run this program in per-job mode and
>> have to make the client attached.
>>
>>
>> Back to our discussion, I can see now there is a divergence of compiling
>> the job graph between in client and in #ClusterEntrypoint. And up and
>> downsides exist in either way. As for the opt-in solution, I have a
>> question, what if the user chooses detach mode, compiling in the client and
>> runs a multi-job program at the same time? And it still not gonna work.
>>
>> Besides, by adding an compiling option, we need to consider more things
>> when submitting a job like "Is my program including multiple job?" or "Does
>> the program need to be initialized before submitting to a remote cluster?",
>> which looks a bit complicated and confusing to me.
>>
>>
>> By summarizing, I'll vote for the per-program new concept but I may not
>> prefer the opt-in option mentioned in the mailing list or maybe we need to
>> reconsider a better concept and definition which is easy to understand.
>>
>>
>>
>> Best,
>>
>> Jiayi Liao
>>
>>  Original Message
>> *Sender:* Rong Rong
>> *Recipient:* Regina" 
>> *Cc:* Theo Diefenthal;
>> user@flink.apache.org
>> *Date:* Friday, Nov 1, 2019 11:01
>> *Subject:* Re: [DISCUSS] Semantic and implementation of per-job mode
>>
>> Hi All,
>>
>> Thanks @Tison for starting the discussion and I think we have very
>> similar scenario with Theo's use cases.
>> In our case we also generates the job graph using a client service (which
>> serves multiple job graph generation from multiple user code) and we've
>> found that managing the upload/download between the cluster and the DFS to
>> be trick and error-prone. In addition, the management of different
>> environment and requirement from different user in a single service posts
>> even more trouble for us.
>>
>> However, shifting the job graph generation towards the cluster side also
>> requires some thoughts regarding how to manage the driver-job as well as
>> some dependencies conflicts - In the case for shipping the job graph
>> generation to the cluster, some unnecessary dependencies for the runtime
>> will be pulled in by the driver-job (correct me if I were wrong Theo)
>>
>> I think in general I agree with @Gyula's main point: unless there is a
>> very strong reason, it is better if we put the driver-mode as an opt-in (at
>> least at the beginning).
>> I left some comments on the document as well. Please kindly take a look.
>>
>> Thanks,
>> Rong
>>
>> On Thu, Oct 31, 2019 at 9:26 AM Chan, Regina  wrote:
>>
>>> Yeah just chiming in this conversation as well. We heavily use multiple
>>> job graphs to g