Re: JobManager not receiving resource offers from Mesos

2018-01-04 Thread Dongwon Kim
Hi Till,

> It could be as simple as giving Flink the right role via 
> `mesos.resourcemanager.framework.role`.
The problem seems more related to resources (GPUs) than framework roles.

The cluster I'm working on consists of servers all equipped with GPUs.
When DC/OS is installed, a GPU-specific configuration parameter named 
"gpus_are_scarce" parameter is enabled by default, which means DC/OS reserves 
GPU nodes exclusively for services that are configured to consume GPU resources 
(https://docs.mesosphere.com/1.10/deploying-services/gpu/ 
).
Therefore, when I executed Flink without specifying to use GPUs, the 
hierarchical allocator has to exclude all GPUs servers from candidates.
Unfortunately, all servers that we use for Mesos agents have GPUs.

Your first guess was somewhat correct! It's because of GPU resources.

Now the problem is how to specify to use GPUs when launching Flink clusters.
However, as you can see below, there's no room for specifying to make 
TaskManagers get allocated GPUs.
It would be great to have a form to specify how much GPUs is necessary for each 
instance of TaskManager.
 

Best,
Dongwon



> 2018. 1. 4. 오후 9:57, Till Rohrmann  작성:
> 
> Hi Dongwon,
> 
> this looks indeed like a Mesos/DC/OS configuration issue. Maybe you could 
> reach out to the Mesos/DC/OS community. It could be as simple as giving Flink 
> the right role via `mesos.resourcemanager.framework.role`.
> 
> Cheers,
> Till
> 
> On Thu, Jan 4, 2018 at 10:45 AM, 김동원  > wrote:
> Hi Till,
> 
>> could you check that your DC/OS cluster has enough capacity to launch the 
>> required task managers? It looks as if the JobManager never gets resource 
>> offers from the Mesos master.
> 
> 
> I have enough cpu and memory to have a single task manager.
> I did a simple test by launching additional 8 tasks while Flink JobManager is 
> still waiting for the Mesos master to send resource offers.
> {
>  "id": "simple-gpu-test",
>  "acceptedResourceRoles":["slave_public", "*"],
>  "cmd": "while [ true ] ; do nvidia-smi; sleep 5; done",
>  "cpus": 1,
>  "mem": 128,
>  "disk": 0,
>  "gpus": 1,
>  "instances": 8
> }
> 
> Below you can see that 9 tasks are being executed: 1 for Flink JobManager and 
> 8 for the above simple-gpu-test.
> 
> 
> What I suspect at the moment is the Mesos master, especially the hierarchical 
> allocator (the default allocation module inside the Mesos master).
> AFAIK the allocator tries to prepare resource offers after it gets REVIVE 
> call from any Mesos framework (Flink in this case).
> While preparing offers, the allocator seems to ignore the Flink framework due 
> to a certain condition about which I have no idea.
> Take a look at the below log messages from the Mesos master (when it gets 
> REVIVE call from Flink JobManager).
> FYI, 5865e700-bd58-4b24-b906-5658c364e45a-0001 is the Marathon framework and 
> 5865e700-bd58-4b24-b906-5658c364e45a-0014 is the Flink framework.
> 
> -
> I0104 09:06:54.00 23302 master.cpp:5225] Processing REVIVE call for 
> framework 5865e700-bd58-4b24-b906-5658c364e45a-0014 (Flink) at 
> scheduler-8911a6c5-d881-4ded-87f3-bcbd1e59ae3d@50.1.100.235 
> :15656
> I0104 09:06:54.00 23302 process.cpp:3270] Resuming 
> hierarchical-allocator(1)@50.1.100.231:5050  at 
> 2018-01-04 00:06:54.710612992+00:00
> I0104 09:06:54.00 23302 hierarchical.cpp:1290] Revived offers for roles { 
> * } of framework 5865e700-bd58-4b24-b906-5658c364e45a-0014
> I0104 09:06:54.00 23302 hierarchical.cpp:2173] Filtered offer with 
> gpus:2; ports:[1025-2180, 2182-3887, 3889-5049, 5052-8079, 8082-8180, 
> 8182-32000]; disk:205900; cpus:48; mem:127634 on agent 
> 5865e700-bd58-4b24-b906-5658c364e45a-S0 for role slave_public of framework 
> 5865e700-bd58-4b24-b906-5658c364e45a-0001
> I0104 09:06:54.00 23302 hierarchical.cpp:2173] Filtered offer with 
> gpus:2; ports:[1025-2180, 2182-3887, 3889-5049, 5052-8079, 8082-8180, 
> 8182-32000]; disk:197491; cpus:48; mem:127634 on agent 
> 5865e700-bd58-4b24-b906-5658c364e45a-S2 for role slave_public of framework 
> 5865e700-bd58-4b24-b906-5658c364e45a-0001
> I0104 09:06:54.00 23302 hierarchical.cpp:2173] Filtered offer with 
> gpus:2; ports:[1025-2180, 2182-3887, 3889-5049, 5052-8079, 8082-8180, 
> 8182-15651, 15657-32000]; disk:197666; cpus:47; mem:126610 on agent 
> 5865e700-bd58-4b24-b906-5658c364e45a-S3 for role slave_public of framework 
> 5865e700-bd58-4b24-b906-5658c364e45a-0001
> I0104 09:06:54.00 23302 hierarchical.cpp:2173] Filtered offer with 
> gpus:2; ports:[1025-2180, 2182-3887, 3889-5049, 5052-8079, 8082-8180, 
> 8182-32000]; disk:199128; cpus:48; mem:127634 on agent 
> 5865e700-bd58-4b24-b906-5658c364e45a-S1 for role slave_public of framework 
> 5865e700-bd58-4b24-b906-5658c364e45a-0001
> I0104 09:06:54.0

[SEATTLE MEETUP] Announcing First Seattle Apache Flink Meetup

2018-01-04 Thread Bowen Li
Hi Flinkers,

After months of preparation, we are excited to announce our first Seattle
Apache Flink meetup, and invite you to join us!

*Please RSVP
at https://www.meetup.com/seattle-apache-flink/events/246458117
. *Food and
drinks will be provided. Plenty of onsite parking spots.

DATE: Jan 17th, 2018, Wednesday

TALKS:

- Haitao Wang, Senior Staff Engineer at Alibaba, will give a presentation
on large-scale streaming processing with Flink and Flink SQL at Alibaba and
several internal use cases.

- Bowen Li will talk about details of future meetup planning and logistics,
also how they use Flink at OfferUp.

AGENDA

- 5:30pm - 6pm Food and networking
- 6pm - 7:30pm Presentations and Q&As

We can't wait to see you then!

Thanks,
Bowen, on behalf of the meetup organizing team


Queryable State in Flink 1.4

2018-01-04 Thread Boris Lublinsky
It appears, that queryable state access significantly changed in 1.4 compared to 1.3.Documentation on the queryable state client https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/queryable_state.html#exampleStates that the client needs to connect to a proxy port.My implementation, which I used for 1.3 is enclosed 

ModelServingKeyedJob.scala
Description: Binary data
Here I am setting both server port and proxy port.When I am running it on a localhost and try to do lsof -i:9069 and lsof -i:9067 It does not show anything using this port.Am I missing something?As a result my query implementation 

ModelStateQuery.scala
Description: Binary data
  Return an error - connection refusedMy state is defined in the following class 

DataProcessorKeyed.scala
Description: Binary data

Boris LublinskyFDP Architectboris.lublin...@lightbend.comhttps://www.lightbend.com/




Re: Apache Flink - Question about rolling window function on KeyedStream

2018-01-04 Thread M Singh
Hi Fabian:
Thanks for your answer - it is starting to make sense to me now. 

On Thursday, January 4, 2018 12:58 AM, Fabian Hueske  
wrote:
 

 Hi,

the ReduceFunction holds the last emitted record as state. When a new record 
arrives, it reduces the new record and last emitted record, updates its state, 
and emits the new result.
Therefore, a ReduceFunction emits one output record for each input record, 
i.e., it is triggered for each input record. The output of the ReduceFunction 
should be treated as a stream of updates not of final results.

Best, Fabian

2018-01-03 18:46 GMT+01:00 M Singh :

Hi Stefan:
Thanks for your response.
A follow up question - In a streaming environment, we invoke the operation 
reduce and then output results to the sink. Does this mean reduce will be 
executed once on every trigger per partition with all the items in each 
partition ?

Thanks 

On Wednesday, January 3, 2018 2:46 AM, Stefan Richter 
 wrote:
 

 Hi,
I would interpret this as: the reduce produces an output for every new reduce 
call, emitting the updated value. There is no need for a window because it 
kicks in on every single invocation.
Best,Stefan


Am 31.12.2017 um 22:28 schrieb M Singh :
Hi:
Apache Flink documentation (https://ci.apache.org/ projects/flink/flink-docs- 
release-1.4/dev/stream/ operators/index.html) indicates that a reduce function 
on a KeyedStream  as follows:
A "rolling" reduce on a keyed data stream. Combines the current element with 
the last reduced value and emits the new value. 

A reduce function that creates a stream of partial sums:keyedStream.reduce(new 
ReduceFunction() {
@Override
public Integer reduce(Integer value1, Integer value2)
throws Exception {
return value1 + value2;
}
});
The KeyedStream is not windowed, so when does the reduce function kick in to 
produce the DataStream (ie, is there a default time out, or collection size 
that triggers it, since we have not defined any window on it).
Thanks
Mans



   



   

Re: Apache Flink - broadcasting DataStream

2018-01-04 Thread M Singh
Thanks Fabian and Ufuk for your answers. 

On Thursday, January 4, 2018 12:32 AM, Fabian Hueske  
wrote:
 

 Hi,

A broadcast replicates all elements by the parallelism of the following 
operator, i.e., each parallel instance of the following operator receives all 
events of its input stream.If the operation following a broadcast is a connect 
and a co-operator (and the other input is not broadcasted), the operator can be 
used for arbitrary joins (not only equi-joins).Depending on the join semantics, 
the joining operator needs to put events from one (broadcasted or 
non-broadcasted) or both inputs as state to be able to perform the join.
Best, Fabian


2017-12-30 23:32 GMT+01:00 M Singh :

Hi Ufuk:
Thanks for your explanation.
I can understand broadcasting a small immutable dataset to the subtasks so that 
they can be joined with a stream.  
However I am still trying to understand how will each broadcasted element from 
a stream be used in join operation with another stream.  Is this just on 
optimization over joining two streams ?  
Also, I believe that substasks are operating on partitions of a stream and only 
equi-joins are possible for streams.  So what is the reason we would like to 
broadcast each element to all the substasks ?
Thanks again. 

On Wednesday, December 27, 2017 12:52 AM, Ufuk Celebi  
wrote:
 

 Hey Mans!

This refers to how sub tasks are connected to each other in your
program. If you have a single sub task A1 and three sub tasks B1, B2,
B3, broadcast will emit each incoming record at A1 to all B1, B2, B3:

A1 --+-> B1
    +-> B2
    +-> B3

Does this help?

On Mon, Dec 25, 2017 at 7:12 PM, M Singh  wrote:
> 1 What elements get broad to the partitions ?

Each incoming element is broadcasted

> 2. What happens as new elements are added to the stream ? Are only the new
> elements broadcast ?

Yes, each incoming element is broadcasted separately without any history.

> 3. Since the broadcast operation returns a DataStream can it be used in join
> how do new (and old) elements affect the join results ?

Yes, should be. Every element is broadcasted only once.

> 4. Similarly how does broadcast work with connected streams ?

Similar to non connected streams. The incoming records are emitted to
every downstream partition.

– Ufuk

   



   

Re: Service discovery for flink-metrics-prometheus

2018-01-04 Thread Stephan Ewen
How are you running deploying your Flink processes? For Service Discovery
for Prometheus on Kubernetes, there are a few articles out there...

On Thu, Jan 4, 2018 at 3:52 PM, Aljoscha Krettek 
wrote:

> I'm not aware of how this is typically done but maybe Chesnay (cc'ed) has
> an idea.
>
> > On 14. Dec 2017, at 16:55, Kien Truong  wrote:
> >
> > Hi,
> >
> > Does anyone have recommendations about integrating
> flink-metrics-prometheus with some SD mechanism
> >
> > so that Prometheus can pick up the Task Manager's location dynamically ?
> >
> > Best regards,
> >
> > Kien
> >
>
>


Re: Testing Flink 1.4 unable to write to s3 locally

2018-01-04 Thread Stephan Ewen
This looks like the output from the client - do you have some TaskManager
log files with more log entries?

That would be helpful...


On Wed, Jan 3, 2018 at 5:26 PM, Kyle Hamlin  wrote:

> Hello Stephan & Nico,
>
> Here is the full stacktrace, its not much more than what I originally
> posted. I remember seeing an XMLInputFactory input error at one point,
> but I haven't seen that again. Is there any other information I can provide
> that will help resolve?
>
> [flink-1.4.0] ./bin/flink run ~/streaming.jar
> Cluster configuration: Standalone cluster with JobManager at localhost/
> 127.0.0.1:6123
> Using address localhost:6123 to connect to JobManager.
> JobManager web interface address http://localhost:8082
> Starting execution of program
> Submitting job with JobID: 022e4a310cd56ba4f7befc0286921663. Waiting for
> job completion.
> Connected to JobManager at 
> Actor[akka.tcp://flink@localhost:6123/user/jobmanager#-1321297259]
> with leader session id ----.
> 01/03/2018 11:21:53 Job execution switched to status RUNNING.
> 01/03/2018 11:21:53 Source: Kafka -> Sink: Unnamed(1/1) switched to
> SCHEDULED
> 01/03/2018 11:21:53 Source: Kafka -> Sink: Unnamed(1/1) switched to
> DEPLOYING
> 01/03/2018 11:21:53 Source: Kafka -> Sink: Unnamed(1/1) switched to
> RUNNING
> 01/03/2018 11:21:54 Source: Kafka -> Sink: Unnamed(1/1) switched to FAILED
> java.lang.NoClassDefFoundError: Could not initialize class
> org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.internal.
> S3ErrorResponseHandler
> at org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.
> s3.AmazonS3Client.(AmazonS3Client.java:363)
> at org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.
> s3.AmazonS3Client.(AmazonS3Client.java:542)
> at org.apache.flink.fs.s3presto.shaded.com.facebook.presto.
> hive.PrestoS3FileSystem.createAmazonS3Client(PrestoS3FileSystem.java:639)
> at org.apache.flink.fs.s3presto.shaded.com.facebook.presto.
> hive.PrestoS3FileSystem.initialize(PrestoS3FileSystem.java:212)
> at org.apache.flink.fs.s3presto.S3FileSystemFactory.create(
> S3FileSystemFactory.java:132)
> at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(
> FileSystem.java:397)
> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320)
> at org.apache.flink.core.fs.Path.getFileSystem(Path.java:293)
> at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.<
> init>(FsCheckpointStreamFactory.java:99)
> at org.apache.flink.runtime.state.filesystem.FsStateBackend.
> createStreamFactory(FsStateBackend.java:277)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> createCheckpointStreamFactory(StreamTask.java:787)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator.
> initializeState(AbstractStreamOperator.java:247)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> initializeOperators(StreamTask.java:694)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> initializeState(StreamTask.java:682)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
>
> 01/03/2018 11:21:54 Job execution switched to status FAILING.
> java.lang.NoClassDefFoundError: Could not initialize class
> org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.internal.
> S3ErrorResponseHandler
> at org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.
> s3.AmazonS3Client.(AmazonS3Client.java:363)
> at org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.
> s3.AmazonS3Client.(AmazonS3Client.java:542)
> at org.apache.flink.fs.s3presto.shaded.com.facebook.presto.
> hive.PrestoS3FileSystem.createAmazonS3Client(PrestoS3FileSystem.java:639)
> at org.apache.flink.fs.s3presto.shaded.com.facebook.presto.
> hive.PrestoS3FileSystem.initialize(PrestoS3FileSystem.java:212)
> at org.apache.flink.fs.s3presto.S3FileSystemFactory.create(
> S3FileSystemFactory.java:132)
> at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(
> FileSystem.java:397)
> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320)
> at org.apache.flink.core.fs.Path.getFileSystem(Path.java:293)
> at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.<
> init>(FsCheckpointStreamFactory.java:99)
> at org.apache.flink.runtime.state.filesystem.FsStateBackend.
> createStreamFactory(FsStateBackend.java:277)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> createCheckpointStreamFactory(StreamTask.java:787)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator.
> initializeState(AbstractStreamOperator.java:247)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> initializeOperators(StreamTask.java:694)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> initializeState(StreamTask.java:682)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:253)
> at org.apache.flink.

Flink and Rest API

2018-01-04 Thread Alberto Ramón
* Read from Rest API in streaming / micro batch some values ( Example: read
last Value of BitCoin)

*  Expose Queriable State as as queriable Rest API (Example: Expose
intermediate results on demmand)


Re: Flink State monitoring

2018-01-04 Thread Steven Wu
Aljoscha/Stefan,

if incremental checkpoint is enabled, I assume the "checkpoint size" is
only the delta/incremental size (not the full state size), right?

Thanks,
Steven


On Thu, Jan 4, 2018 at 5:18 AM, Aljoscha Krettek 
wrote:

> Hi,
>
> I'm afraid there is currently no metrics around state. I see that it's
> very good to have so I'm putting it on my list of stuff that we should have
> at some point.
>
> One thing that comes to mind is checking the size of checkpoints, which
> gives you an indirect way of figuring out how big state is but that's not
> very exact, i.e. doesn't give you "number of keys" or some such.
>
> Best,
> Aljoscha
>
> > On 20. Dec 2017, at 08:09, Netzer, Liron  wrote:
> >
> > Ufuk, Thanks for replying !
> >
> > Aljoscha, can you please assist with the questions below?
> >
> > Thanks,
> > Liron
> >
> > -Original Message-
> > From: Ufuk Celebi [mailto:u...@apache.org]
> > Sent: Friday, December 15, 2017 3:06 PM
> > To: Netzer, Liron [ICG-IT]
> > Cc: user@flink.apache.org
> > Subject: Re: Flink State monitoring
> >
> > Hey Liron,
> >
> > unfortunately, there are no built-in metrics related to state. In
> general, exposing the actual values as metrics is problematic, but exposing
> summary statistics would be a good idea. I'm not aware of a good work
> around at the moment that would work in the general case (taking into
> account state restore, etc.).
> >
> > Let me pull in Aljoscha (cc'd) who knows the state backend internals
> well.
> >
> > @Aljoscha:
> > 1) Are there any plans to expose keyed state related metrics (like
> number of keys)?
> > 2) Is there a way to work around the lack of these metrics in 1.3?
> >
> > – Ufuk
> >
> > On Thu, Dec 14, 2017 at 10:55 AM, Netzer, Liron 
> wrote:
> >> Hi group,
> >>
> >>
> >>
> >> We are using Flink keyed state in several operators.
> >>
> >> Is there an easy was to expose the data that is stored in the state,
> i.e.
> >> the key and the values?
> >>
> >> This is needed for both monitoring as well as debugging. We would like
> >> to understand how many key+values are stored in each state and also to
> >> view the data itself.
> >>
> >> I know that there is the "Queryable state" option, but this is still
> >> in Beta, and doesn't really give us what we want easily.
> >>
> >>
> >>
> >>
> >>
> >> *We are using Flink 1.3.2 with Java.
> >>
> >>
> >>
> >> Thanks,
> >>
> >> Liron
>
>


Re: About Kafka08Fetcher and Kafka010Fetcher

2018-01-04 Thread Tzu-Li (Gordon) Tai
Hi Jaxon,

The threading model is implemented differently between the Kafka08Fetcher and 
all other fetcher versions higher than 0.9+ because the Kafka Java clients used 
between these versions have different abstraction levels.

The Kafka08Fetcher still uses the low-level `SimpleConsumer` API, which only 
allows connection to a single Kafka broker. Therefore, the Kafka08Fetcher 
maintains a thread for each broker that it connects to. If a subtask’s assigned 
partitions all exist on the same Kafka broker, only one thread will be created.

On the other hand, in versions 0.9+, the high-level `KafkaConsumer` API is 
used, which hides away the complexity for per-broker connections. I believe 
that the `KafkaConsumer` client still creates per-broker threads internally, if 
its assigned partitions span multiple brokers.

Cheers,
Gordon

On 2 January 2018 at 8:22:22 AM, Timo Walther (twal...@apache.org) wrote:

Maybe Gordon (in CC) can answer your question.  


Am 1/1/18 um 3:36 PM schrieb Jaxon Hu:  
> In Kafka08Fetcher, it use  Map to manage  
> multi-threads. But I notice in Kafka09Fetcher or Kafka010Fetcher, it's  
> gone. So how Kafka09Fetcher implements multi-threads read partitions  
> from kafka?  




Re: Exception on running an Elasticpipe flink connector

2018-01-04 Thread Nico Kruber
Hi Vipul,
Yes, this looks like a problem with a different netty version being
picked up.

First of all, let me advertise Flink 1.4 for this since there we
properly shade away our netty dependency (on version 4.0.27 atm) so you
(or in this case Elasticsearch) can rely on your required version. Since
you are executing your job inside a Flink cluster, there will be some
more things in the classpath than what your job itself requires, e.g.
core/runtime Flink dependencies and also things from the Hadoop
classpath if present.

Locally in the IDE, only a limited set of libraries are included and the
classpath is set up a bit differently, I suppose. Maybe, you are also
affected by the Maven shading problem for maven >= 3.3 [1][2].
As a workaround, can you try to shade elasticsearch's netty away? See
[3] for details.


Regards
Nico

[1] https://issues.apache.org/jira/browse/FLINK-5013
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/start/building.html
[3]
https://stackoverflow.com/questions/42962892/flink-with-elasticsearch-5-sink-conflicts-of-io-netty-library

On 04/01/18 07:09, vipul singh wrote:
> Hello,
> 
> We are working on a Flink ES connector, sourcing from a kafka stream,
> and sinking data into elasticsearch. The code works fine in intellij,
> but while running the code on emr(version 5.9, which uses flink 1.3.2)
> using flink-yarn-session, we are seeing this exception
> 
> Using the parallelism provided by the remote cluster (1). To use
> another parallelism, set it at the ./bin/flink client.
> 
> Starting execution of program
> 
> 2018-01-02 23:19:16,217 INFO org.apache.flink.yarn.YarnClusterClient
> - Starting program in interactive mode
> 
> 
> 
> The program finished with the following exception:
> 
> java.lang.NoSuchMethodError:
> 
> io.netty.buffer.CompositeByteBuf.addComponents(ZLjava/lang/Iterable;)Lio/netty/buffer/CompositeByteBuf;
> 
> at
> 
> org.elasticsearch.transport.netty4.Netty4Utils.toByteBuf(Netty4Utils.java:78)
> 
> at
> 
> org.elasticsearch.transport.netty4.Netty4Transport.sendMessage(Netty4Transport.java:449)
> 
> at
> 
> org.elasticsearch.transport.netty4.Netty4Transport.sendMessage(Netty4Transport.java:91)
> 
> at
> 
> org.elasticsearch.transport.TcpTransport.internalSendMessage(TcpTransport.java:976)
> 
> at
> 
> org.elasticsearch.transport.TcpTransport.sendRequest(TcpTransport.java:958)
> 
> at
> 
> org.elasticsearch.transport.TransportService.sendRequestInternal(TransportService.java:520)
> 
> at
> 
> org.elasticsearch.transport.TransportService.sendRequest(TransportService.java:465)
> 
> at
> 
> org.elasticsearch.transport.TransportService.submitRequest(TransportService.java:451)
> 
> at
> 
> org.elasticsearch.client.transport.TransportClientNodesService$SimpleNodeSampler.doSample(TransportClientNodesService.java:403)
> 
> at
> 
> org.elasticsearch.client.transport.TransportClientNodesService$NodeSampler.sample(TransportClientNodesService.java:338)
> 
> at
> 
> org.elasticsearch.client.transport.TransportClientNodesService.addTransportAddresses(TransportClientNodesService.java:179)
> 
> at
> 
> org.elasticsearch.client.transport.TransportClient.addTransportAddress(TransportClient.java:301)
> 
> On searching online, it seems like this maybe due to netty version
> conflicts.
> However when we ran a dependency tree on our pom, and we dont see netty
> coming from anywhere else but flink:
> https://gist.github.com/neoeahit/b42b435e3c4519e632be87782de1cc06
> 
> Could you please suggest how can we resolve this error,
> 
> Thanks,
> Vipul
> 
> 
> 



signature.asc
Description: OpenPGP digital signature


RE: Two operators consuming from same stream

2018-01-04 Thread Sofer, Tovi
Hi Timo,

Actually I do keyBy in both cases, and in split\duplicate case I do it on both 
splitted streams.

I did do the connect below twice and not once, but connect only calls ctor of 
ConnectedStreams, and doesn’t do any real operation.
So I don’t see how it will make a difference.
I can try it if you see a reason.


More detailed code including all keyBy:

Code without duplication looks something like:
KeyedStream orderKeyedStream = ordersStream.keyBy(field);
KeyedStream pricesKeyedStream = pricesStream.keyBy(field);

orderKeyedStream.connect(pricesKeyedStream).flatMap(mapperA);
orderKeyedStream.connect(pricesKeyedStream).flatMap(mapperB);


Code with duplication (better latency):
(We duplicate streams and then do connect of pricesStreamA with ordersStreamA, 
and pricesStreamA with ordersStreamB, and keyBy as part of connect, similar to 
above).
//duplicate prices streams
SplitStream pricesSplitStream = pricesStream
.split( price -> ImmutableList.of("pricesStreamA "," 
pricesStreamB ") );
 DataStream pricesStreamA = pricesSplitStream.select("pricesStreamA");
DataStream< Price > pricesStreamB= pricesSplitStream.select("pricesStreamB");
//duplicate orders streams
 SplitStream< Order > ordersSplitStream = ordersStream
.split( order -> ImmutableList.of("orderStreamA "," 
orderStreamB ") );
 DataStream orderStreamA = ordersSplitStream.select("orderStreamA ");
DataStream orderStreamB = ordersSplitStream.select("orderStreamB ");

DataStream priceOrdersConnectedStream = 
orderStreamA.connect(pricesStreamA).keyBy(“priceId”,“ priceId”)
.flatMap(mapperA);
DataStream orderPricesConnectedStream = 
orderStreamB.connect(pricesStreamB).keyBy(“orderId”,“ orderId”)
.flatMap(mapperB);



From: Timo Walther [mailto:twal...@apache.org]
Sent: יום ד 03 ינואר 2018 11:02
To: user@flink.apache.org
Subject: Re: Two operators consuming from same stream

Hi Tovi,

I think your code without duplication performs two separate shuffle operations 
whereas the other code only performs one shuffle.

Further latency impacts might be due to the overhead involved in maintaining 
the partitioning for a keyed stream/key groups and switching key contexts in 
the operator.

Did you check the latency of the following?

DataStream<> ds = 
orderKeyedStream.connect(pricesKeyedStream).flatMap(identityMapper);
ds.flatMap(mapperA);
ds.flatMap(mapperB);

Regards,
Timo


Am 1/1/18 um 2:50 PM schrieb Sofer, Tovi :
Hi group,

We have the following graph below, on which we added metrics for latency 
calculation.
We have two streams which are consumed by two operators:

· ordersStream and pricesStream – they are both consumed by two 
operators: CoMapperA and CoMapperB, each using connect.


Initially we thought that for stream consumed by two operators – that we need 
to duplicate the stream to two separate streams, so we did it using split as 
below.
Then we understood it is not a must , and two operators can consume same 
stream, so we removed the duplicate part.
However – when checking latency – we found that latency with duplicated streams 
was much better than without duplication (about twice).

My questions:

· Is the improved latency related to check pointing separately on those 
streams ?

· What is the cons of using the duplication if it has better latency? 
Are we harming the state correctness in any way?

Additional Info:
The two graphs configuration appear exactly the same in execution plan\web UI:



[sourceOrders.keyBy,CoMapperA,OrdersStreams]

[cid:image002.png@01D3857E.4C6223F0][cid:image003.png@01D3857E.4C6223F0][cid:image004.png@01D3857E.4C6223F0][cid:image005.png@01D3857E.4C6223F0]prsss






[cid:image006.png@01D3857E.4C6223F0]

[sourcePrices.keyBy,CoMapperB,pricesStreams]







Code without duplication looks something like:
KeyedStream orderKeyedStream = ordersStream.keyBy(field);
KeyedStream pricesKeyedStream = pricesStream.keyBy(field);

orderKeyedStream.connect(pricesKeyedStream).flatMap(mapperA);
orderKeyedStream.connect(pricesKeyedStream).flatMap(mapperB);


Code used for duplication:
(We duplicate streams and then do connect of pricesStreamA with ordersStreamA, 
and pricesStreamA with ordersStreamB, and keyBy as part of connect, similar to 
above).
//duplicate prices streams
SplitStream pricesStream = pricesStream
.split( price -> ImmutableList.of("pricesStreamA "," 
pricesStreamB ") );

DataStream pricesStreamA = pricesStreams.select("pricesStreamA");
DataStream< Price > pricesStreamB= 
pricesStreams.select("pricesStreamB");


Thanks,
Tovi









Re: Parquet Format Read and Write

2018-01-04 Thread Aljoscha Krettek
A quick google search is turning up these results:

 - 
https://medium.com/@istanbul_techie/crunching-parquet-files-with-apache-flink-200bec90d8a7
 

 - https://github.com/FelixNeutatz/parquet-flinktacular 

 - https://github.com/nezihyigitbasi/FlinkParquet 


Best,
Aljoscha

> On 29. Dec 2017, at 18:13, Imran Tariq  
> wrote:
> 
> Hi,
> 
> I am new to Flink and exploring it. Just curious to know that, do Flink API 
> support Parquet format like Spark API can easily read and write in Parquet.
> 
> -- 
> Thanks & Regards
> Muhammad Imran Tariq



Re: Service discovery for flink-metrics-prometheus

2018-01-04 Thread Aljoscha Krettek
I'm not aware of how this is typically done but maybe Chesnay (cc'ed) has an 
idea.

> On 14. Dec 2017, at 16:55, Kien Truong  wrote:
> 
> Hi,
> 
> Does anyone have recommendations about integrating flink-metrics-prometheus 
> with some SD mechanism
> 
> so that Prometheus can pick up the Task Manager's location dynamically ?
> 
> Best regards,
> 
> Kien
> 



Re: about the checkpoint and state backend

2018-01-04 Thread Aljoscha Krettek
TaskManagers don't do any checkpointing but Operators that run in TaskManagers 
do.

Each operator, of which there are multiple running on multiple TMs in the 
cluster will write to a unique DFS directory. Something like:
/checkpoints/job-xyz/checkpoint-1/operator-a/1

These individual checkpoints are not merged together into one directory but the 
handles to those directories are sent to the CheckpointCoordinator which 
creates a checkpoint that stores handles to all the states stored in DFS.

Best,
Aljoscha

> On 4. Jan 2018, at 15:06, Jinhua Luo  wrote:
> 
> One task manager would create one rocksdb instance on its local
> temporary dir, correct?
> Since there is likely multiple task managers for one cluster, so how
> they handle directory conflict, because one rocksdb instance is one
> directory, that is, what I mentioned at first, how they merge rocksdb
> instances and store it on the single distributed filesystem path?
> 
> 2018-01-04 22:00 GMT+08:00 Aljoscha Krettek :
>> Ah I see. Currently the RocksDB backend will use one column in RocksDB per 
>> state that is registered. The states for different keys of one state are 
>> stored in one column.
>> 
>>> On 4. Jan 2018, at 14:56, Jinhua Luo  wrote:
>>> 
>>> ok, I see.
>>> 
>>> But as known, one rocksdb instance occupy one directory, so I am still
>>> wondering what's the relationship between the states and rocksdb
>>> instances.
>>> 
>>> 2018-01-04 21:50 GMT+08:00 Aljoscha Krettek :
 Each operator (which run in a TaskManager) will write its state to some 
 location in HDFS (or any other DFS) and send a handle to this to the 
 CheckpointCoordinator (which is running on the JobManager). The 
 CheckpointCoordinator is collecting all the handles and creating one 
 Uber-Handle, which describes the complete checkpoint. When restoring, the 
 CheckpointCoordinator figures out which handles need to be sent to which 
 operators for restoring.
 
 Best,
 Aljoscha
 
> On 4. Jan 2018, at 14:44, Jinhua Luo  wrote:
> 
> OK, I think I get the point.
> 
> But another question raises: how task managers merge their rocksdb
> snapshot on a global single path?
> 
> 
> 2018-01-04 21:30 GMT+08:00 Aljoscha Krettek :
>> Hi,
>> 
>> The path you give to the constructor must be a path on some distributed 
>> filesystem, otherwise the data will be lost when the local machine 
>> crashes. As you mentioned correctly.
>> 
>> RocksDB will keep files in a local directory (you can specify this using 
>> setDbStoragePath()) and when checkpointing will write to the checkpoint 
>> directory that you specified in the constructor.
>> 
>> Best,
>> Aljoscha
>> 
>> 
>>> On 4. Jan 2018, at 14:23, Jinhua Luo  wrote:
>>> 
>>> I still do not understand the relationship between rocksdb backend and
>>> the filesystem (here I refer to any filesystem impl, including local,
>>> hdfs, s3).
>>> 
>>> For example, when I specify the path to rocksdb backend:
>>> env.setStateBackend(new RocksDBStateBackend("file:///data1/flinkapp"));
>>> 
>>> What does it mean?
>>> 
>>> Each task manager would save states to /data1/flinkapp on its machine?
>>> But it seems no sense. Because when one of the machines crashes, the
>>> job manager could not access the states on dead machine.
>>> Or, each task manager creates rocksdb instance on temporary path, and
>>> send snapshots to job manager, then job manager in turn saves them on
>>> /data1/flinkapp on the job manager's machine?
>>> 
>>> Could you give the data flow example?
>>> 
>>> And another question is, when I turn off checkpointing (which is also
>>> default cfg), what happens to the states processing?
>>> 
>>> 
>>> 
>>> 2018-01-03 0:06 GMT+08:00 Timo Walther :
 Hi Jinhua,
 
 I will try to answer your questions:
 
 Flink checkpoints the state of each operator. For a Kafka consumer 
 operator
 this is only the offset. For other operators (such as Windows or a
 ProcessFunction) the values/list/maps stored in the state are 
 checkpointed.
 If you are interested in the internals, I would recommend this page 
 [1].
 Only the MemoryStateBackend sends entire states to the JobManager (see 
 [2]).
 But you are right, this is a bottleneck and not very fault-tolerant.
 Usually, Flink assumes to have some distributed file system (such as 
 HDFS)
 to which each Flink operator can be checkpointed in a fault-tolerant 
 way.
 For the RocksDbStateBackend the local files are copied to HDFS as 
 well. At
 the time of writing, only the RocksDBBackend supports incremental
 checkpoints. The JobManager can then read from HDFS and restore the 
 operator
 on a differ

Re: ElasticSearch Connector for version 6.x and scala 2.11

2018-01-04 Thread Nico Kruber
Actually, Flink's netty dependency (4.0.27) is shaded away into the
"org.apache.flink.shaded.netty4.io.netty" package now (since version
1.4) and should thus not clash anymore.
However, other netty versions may come into play from the job itself or
from the integration of Hadoop's classpath (if available).


Nico

On 01/12/17 14:42, Jens Oberender wrote:
> Hi
> 
> A workmate of mine tried to migrate the existing flink connector to
> ElasticSearch 6 but we had problems with netty dependencies that clashed
> (Flink uses 4.0.27 and ES is on 4.1).
> You can change the flink-connector-elasticsearch5 connector to ES 5.6.4,
> but then you have to do some adaptions to get it working, as they
> changed the API within a major version!
> But with that version you can write to ES 6.
> 
> He put his changes on Github:
> https://github.com/cognitix/flink/tree/connector-elasticsearch6
> It's called flink-connector-elasticsearch6, there.
> 
> We want to write a new and clean version for ElasticSearch 6, but
> currently don't have the time.
> 
> Best regards,
>   Jens Oberender
> 
> 
> Am 01.12.2017 um 09:52 schrieb Fabian Hueske:
>> Hi Rahul,
>>
>> Flink does not provide a connector for ElasticSearch 6 yet.
>> There is this JIRA issue to track the development progress [1].
>>
>> Best, Fabian
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-8101
>>
>> 2017-12-01 7:22 GMT+01:00 Rahul Raj > >:
>>
>> Hi All,
>>
>> Is there a Flink Elastic search connector for version 6.0 and scala
>> 2.11? I couldn't find it listed
>> here 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/elasticsearch.html
>> 
>> 
>> .
>>
>> Regards,
>> Rahul Raj
>>
>>
> 



signature.asc
Description: OpenPGP digital signature


Re: Queryable State Client - Actor Not found Exception

2018-01-04 Thread Velu Mitwa
Thank you Aljoscha.

I am able to Query state when I use the hostname of Job Manager instead of
its IP Address. But I couldn't understand why it is not working if I give
IP address.

On Thu, Jan 4, 2018 at 6:42 PM, Aljoscha Krettek 
wrote:

> Hi,
>
> Is my-machine:52650 the correct address for the JobManager running in
> YARN? Also, Queryable State in Flink 1.3.2 is not easy to get to work when
> you use YARN with HA mode.
>
> Best,
> Aljoscha
>
> On 3. Jan 2018, at 16:02, Velu Mitwa  wrote:
>
> Hi,
> I am running a Flink Job which uses the Queryable State feature of Apache
> Flink(1.3.2). I was able to do that in local mode. When I try to do that in
> Cluster mode (Yarn Session), I am getting Actor not found Exception.
>
> Please help me to understand what is missing.
>
> *Exception Trace*
>
>
> Query failed because of the following Exception:
> akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(akka.
> tcp://flink@my-machine:52650/), Path(/user/jobmanager)]
> at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorS
> election.scala:65)
> at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorS
> election.scala:63)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
> at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(Ba
> tchingExecutor.scala:55)
> at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.
> scala:73)
> at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.
> unbatchedExecute(Future.scala:74)
> at akka.dispatch.BatchingExecutor$class.execute(
> BatchingExecutor.scala:120)
> at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.
> execute(Future.scala:73)
> at scala.concurrent.impl.CallbackRunnable.executeWithValue(
> Promise.scala:40)
> at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Pro
> mise.scala:248)
> at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupp
> ort.scala:334)
> at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
> at scala.concurrent.Future$InternalCallbackExecutor$.scala$
> concurrent$Future$InternalCallbackExecutor$$unbatchedExecute
> (Future.scala:694)
> at scala.concurrent.Future$InternalCallbackExecutor$.execute(
> Future.scala:691)
> at akka.actor.LightArrayRevolverScheduler$TaskHolder.
> executeTask(Scheduler.scala:474)
> at akka.actor.LightArrayRevolverScheduler$$anon$8.
> executeBucket$1(Scheduler.scala:425)
> at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(
> Scheduler.scala:429)
> at akka.actor.LightArrayRevolverScheduler$$anon$8.run(
> Scheduler.scala:381)
> at java.lang.Thread.run(Thread.java:745)
>
> *Client Creation Snippet *
>
> * Configuration config = new Configuration();*
> *config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY,
> jobManagerHost);*
> *config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
> jobManagerPort);*
>
> *final HighAvailabilityServices highAvailabilityServices =
> HighAvailabilityServicesUtils*
> *.createHighAvailabilityServices(config,
> Executors.newSingleThreadScheduledExecutor(),*
> *
> HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);*
>
> *this.client = new QueryableStateClient(config,
> highAvailabilityServices);*
> *  }*
>
>
>
>


Re: Separate checkpoint directories

2018-01-04 Thread Stefan Richter
Hi,

the state is checkpointed in subdirectories and with unique file names, so 
having all in one root directory is no problem. This all happens automatically.

As far as I know, there is no implementation that generates output paths for 
sinks like that. You could open a jira with a feature wish, though.

Best,
Stefan

> Am 03.01.2018 um 16:06 schrieb Kyle Hamlin :
> 
> Hi Stefan,
> 
> In the past, I ran four separate Flink apps to sink data from four separate 
> Kafka topics to s3 without any transformations applied. For each Flink app, I 
> would set the checkpoint directory to 
> s3://some-bucket/checkpoints/topic-name. It appears that with Flink 1.4 I can 
> just use a regex to pick up all four topics, and so what you are telling me 
> is that even if my regex picks up 1000 Kafka topics the only checkpoint path 
> I need is s3://some-bucket/checkpoints/ and Flink will take care of the rest?
> 
> Additionally, I was wondering how this concept might extend to sinking the 
> data from this single Flink app that has picked up 1000 Kafka topics to 
> separate s3 paths? For instance:
> 
> s3://some-bucket/data/topic1/
> s3://some-bucket/data/topic2/
> .
> .
> .
> s3://some-bucket/data/topic1000/
> 
> Thanks very much for your help Stefan!
> 
> On Wed, Jan 3, 2018 at 10:51 AM Stefan Richter  > wrote:
> Hi,
> 
> first, let my ask why you want to have a different checkpoint directory per 
> topic? It is perfectly ok to have just a single checkpoint directory, so I 
> wonder what the intention is? Flink will already create proper subdirectories 
> and filenames and can identify the right checkpoint data for each operator 
> instance.
> 
> Best,
> Stefan
> 
> 
>> Am 31.12.2017 um 17:10 schrieb Kyle Hamlin > >:
>> 
>> Flink 1.4 added regex pattern matching for FlinkKafkaConsumer's which is a 
>> neat feature. I would like to use this feature, but I'm wondering how that 
>> impacts the FsStateBackend checkpointing mechanism. Before I would subscribe 
>> to one topic and set a checkpoint path specific to that topic for example if 
>> the Kafka topic name was foo:
>> 
>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>> env.setStateBackend(new FsStateBackend("s3://checkpoints/foo/"))
>> 
>> How does one dynamically set these checkpoint paths? Is it even necessary to 
>> do so, should I have one checkpoint path for all the possible topics the 
>> regex pattern could pick up?
> 



Re: does the flink sink only support bio?

2018-01-04 Thread Stefan Richter
Yes, that is how it works.

> Am 04.01.2018 um 14:47 schrieb Jinhua Luo :
> 
> The TwoPhaseCommitSinkFunction seems to record the transaction status
> in the state just like what I imagine above, correct?
> and if the progress fails before commit, in the later restart, the
> commit would be triggered again, correct? So the commit would not be
> forgotten, correct?
> 
> 2018-01-03 22:54 GMT+08:00 Stefan Richter :
>> I think a mix of async UPDATES and exactly-once all this might be tricky,
>> and the typical use case for async IO is more about reads. So let’s take a
>> step back: what would you like to achieve with this? Do you want a
>> read-modify-update (e.g. a map function that queries and updates a DB) or
>> just updates (like a sink based that goes against a DB). From the previous
>> question, I assume the second case applies, in which case I wonder why you
>> even need to be async for a sink? I think a much better approach could be
>> based on Flink's TwoPhaseCommitSinkFunction, and maybe use some some
>> batching to lower update costs.
>> 
>> On top of the TwoPhaseCommitSinkFunction, you could implement transactions
>> against your DB, similar to e.g. this example with Postgres:
>> http://hlinnaka.iki.fi/2013/04/11/how-to-write-a-java-transaction-manager-that-works-with-postgresql/
>> .
>> 
>> Does this help or do you really need async read-modify-update?
>> 
>> Best,
>> Stefan
>> 
>> 
>> Am 03.01.2018 um 15:08 schrieb Jinhua Luo :
>> 
>> No, I mean how to implement exactly-once db commit (given our async io
>> target is mysql), not the state used by flink.
>> As mentioned in previous mail, if I commit db in
>> notifyCheckpointComplete, we have a risk to lost data (lost commit,
>> and flink restart would not trigger notifyCheckpointComplete for the
>> last checkpoint again).
>> On the other hand, if I update and commit per record, the sql/stored
>> procedure have to handle duplicate updates at failure restart.
>> 
>> So, when or where to commit so that we could get exactly-once db ingress.
>> 
>> 2018-01-03 21:57 GMT+08:00 Stefan Richter :
>> 
>> 
>> Hi,
>> 
>> 
>> Then how to implement exactly-once async io? That is, neither missing
>> data or duplicating data.
>> 
>> 
>> From the docs about async IO here
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/asyncio.html
>> :
>> 
>> "Fault Tolerance Guarantees:
>> The asynchronous I/O operator offers full exactly-once fault tolerance
>> guarantees. It stores the records for in-flight asynchronous requests in
>> checkpoints and restores/re-triggers the requests when recovering from a
>> failure.“
>> 
>> So it is already handled by Flink in a way that supports exactly-once.
>> 
>> Is there some way to index data by checkpoint id and records which
>> checkpoints already commit to db? But that means we need MapState,
>> right?
>> 
>> 
>> The information required depends a bit on the store that you are using,
>> maybe the last confirmed checkpoint id is enough, but maybe you require
>> something more. This transaction information is probably not „by-key“, but
>> „per-operator“, so I would suggest to use operator state (see next answer).
>> Btw the implementation of async operators does something very similar to
>> restore pending requests, and you can see the code in „AsyncWaitOperator".
>> 
>> 
>> However, the async-io operator normally follows other operators, e.g.
>> fold, so it normally faces the DataStream but not KeyedStream, and
>> DataStream only supports ListState, right?
>> 
>> 
>> You can use non-keyed state, aka operator state, to store such information.
>> See here:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#using-managed-operator-state
>> . It does not require a KeyedSteam.
>> 
>> Best,
>> Stefan
>> 
>> 
>> 
>> 2018-01-03 18:43 GMT+08:00 Stefan Richter :
>> 
>> 
>> 
>> Am 01.01.2018 um 15:22 schrieb Jinhua Luo :
>> 
>> 2017-12-08 18:25 GMT+08:00 Stefan Richter :
>> 
>> You need to be a bit careful if your sink needs exactly-once semantics. In
>> this case things should either be idempotent or the db must support rolling
>> back changes between checkpoints, e.g. via transactions. Commits should be
>> triggered for confirmed checkpoints („notifyCheckpointComplete“).
>> 
>> 
>> I doubt if we have a risk here: in notifyCheckpointComplete, the
>> checkpoint was completed, and if the process crashes (or machine
>> failure) before it commits the db, the flink would restart the app,
>> restoring the state from the last checkpoint, but it would not invoke
>> notifyCheckpointComplete again? correct? if so, we would miss the
>> database ingress for the data between the last two checkpoints, am I
>> correct?
>> 
>> 
>> Yes, that is correct. What I was talking about was more the opposite
>> problem,i.e. committing too early. In that case, you could have committed
>> for a checkpoint that failed afterwards, and recovery will start from an
>> earlier checkpoint but with your commit already appl

Re: about the checkpoint and state backend

2018-01-04 Thread Jinhua Luo
One task manager would create one rocksdb instance on its local
temporary dir, correct?
Since there is likely multiple task managers for one cluster, so how
they handle directory conflict, because one rocksdb instance is one
directory, that is, what I mentioned at first, how they merge rocksdb
instances and store it on the single distributed filesystem path?

2018-01-04 22:00 GMT+08:00 Aljoscha Krettek :
> Ah I see. Currently the RocksDB backend will use one column in RocksDB per 
> state that is registered. The states for different keys of one state are 
> stored in one column.
>
>> On 4. Jan 2018, at 14:56, Jinhua Luo  wrote:
>>
>> ok, I see.
>>
>> But as known, one rocksdb instance occupy one directory, so I am still
>> wondering what's the relationship between the states and rocksdb
>> instances.
>>
>> 2018-01-04 21:50 GMT+08:00 Aljoscha Krettek :
>>> Each operator (which run in a TaskManager) will write its state to some 
>>> location in HDFS (or any other DFS) and send a handle to this to the 
>>> CheckpointCoordinator (which is running on the JobManager). The 
>>> CheckpointCoordinator is collecting all the handles and creating one 
>>> Uber-Handle, which describes the complete checkpoint. When restoring, the 
>>> CheckpointCoordinator figures out which handles need to be sent to which 
>>> operators for restoring.
>>>
>>> Best,
>>> Aljoscha
>>>
 On 4. Jan 2018, at 14:44, Jinhua Luo  wrote:

 OK, I think I get the point.

 But another question raises: how task managers merge their rocksdb
 snapshot on a global single path?


 2018-01-04 21:30 GMT+08:00 Aljoscha Krettek :
> Hi,
>
> The path you give to the constructor must be a path on some distributed 
> filesystem, otherwise the data will be lost when the local machine 
> crashes. As you mentioned correctly.
>
> RocksDB will keep files in a local directory (you can specify this using 
> setDbStoragePath()) and when checkpointing will write to the checkpoint 
> directory that you specified in the constructor.
>
> Best,
> Aljoscha
>
>
>> On 4. Jan 2018, at 14:23, Jinhua Luo  wrote:
>>
>> I still do not understand the relationship between rocksdb backend and
>> the filesystem (here I refer to any filesystem impl, including local,
>> hdfs, s3).
>>
>> For example, when I specify the path to rocksdb backend:
>> env.setStateBackend(new RocksDBStateBackend("file:///data1/flinkapp"));
>>
>> What does it mean?
>>
>> Each task manager would save states to /data1/flinkapp on its machine?
>> But it seems no sense. Because when one of the machines crashes, the
>> job manager could not access the states on dead machine.
>> Or, each task manager creates rocksdb instance on temporary path, and
>> send snapshots to job manager, then job manager in turn saves them on
>> /data1/flinkapp on the job manager's machine?
>>
>> Could you give the data flow example?
>>
>> And another question is, when I turn off checkpointing (which is also
>> default cfg), what happens to the states processing?
>>
>>
>>
>> 2018-01-03 0:06 GMT+08:00 Timo Walther :
>>> Hi Jinhua,
>>>
>>> I will try to answer your questions:
>>>
>>> Flink checkpoints the state of each operator. For a Kafka consumer 
>>> operator
>>> this is only the offset. For other operators (such as Windows or a
>>> ProcessFunction) the values/list/maps stored in the state are 
>>> checkpointed.
>>> If you are interested in the internals, I would recommend this page [1].
>>> Only the MemoryStateBackend sends entire states to the JobManager (see 
>>> [2]).
>>> But you are right, this is a bottleneck and not very fault-tolerant.
>>> Usually, Flink assumes to have some distributed file system (such as 
>>> HDFS)
>>> to which each Flink operator can be checkpointed in a fault-tolerant 
>>> way.
>>> For the RocksDbStateBackend the local files are copied to HDFS as well. 
>>> At
>>> the time of writing, only the RocksDBBackend supports incremental
>>> checkpoints. The JobManager can then read from HDFS and restore the 
>>> operator
>>> on a different machine.
>>>
>>> Feel free to ask further questions.
>>>
>>> Regards,
>>> Timo
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/internals/stream_checkpointing.html
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/state/state_backends.html
>>>
>>>
>>>
>>> Am 1/1/18 um 3:50 PM schrieb Jinhua Luo:
>>>
 Hi All,

 I have two questions:

 a) does the records/elements themselves would be checkpointed? or just
 record offset checkpointed? That is, what data included in the
 checkpoint except for states?

 b) where flink stores

Re: about the checkpoint and state backend

2018-01-04 Thread Aljoscha Krettek
Ah I see. Currently the RocksDB backend will use one column in RocksDB per 
state that is registered. The states for different keys of one state are stored 
in one column.

> On 4. Jan 2018, at 14:56, Jinhua Luo  wrote:
> 
> ok, I see.
> 
> But as known, one rocksdb instance occupy one directory, so I am still
> wondering what's the relationship between the states and rocksdb
> instances.
> 
> 2018-01-04 21:50 GMT+08:00 Aljoscha Krettek :
>> Each operator (which run in a TaskManager) will write its state to some 
>> location in HDFS (or any other DFS) and send a handle to this to the 
>> CheckpointCoordinator (which is running on the JobManager). The 
>> CheckpointCoordinator is collecting all the handles and creating one 
>> Uber-Handle, which describes the complete checkpoint. When restoring, the 
>> CheckpointCoordinator figures out which handles need to be sent to which 
>> operators for restoring.
>> 
>> Best,
>> Aljoscha
>> 
>>> On 4. Jan 2018, at 14:44, Jinhua Luo  wrote:
>>> 
>>> OK, I think I get the point.
>>> 
>>> But another question raises: how task managers merge their rocksdb
>>> snapshot on a global single path?
>>> 
>>> 
>>> 2018-01-04 21:30 GMT+08:00 Aljoscha Krettek :
 Hi,
 
 The path you give to the constructor must be a path on some distributed 
 filesystem, otherwise the data will be lost when the local machine 
 crashes. As you mentioned correctly.
 
 RocksDB will keep files in a local directory (you can specify this using 
 setDbStoragePath()) and when checkpointing will write to the checkpoint 
 directory that you specified in the constructor.
 
 Best,
 Aljoscha
 
 
> On 4. Jan 2018, at 14:23, Jinhua Luo  wrote:
> 
> I still do not understand the relationship between rocksdb backend and
> the filesystem (here I refer to any filesystem impl, including local,
> hdfs, s3).
> 
> For example, when I specify the path to rocksdb backend:
> env.setStateBackend(new RocksDBStateBackend("file:///data1/flinkapp"));
> 
> What does it mean?
> 
> Each task manager would save states to /data1/flinkapp on its machine?
> But it seems no sense. Because when one of the machines crashes, the
> job manager could not access the states on dead machine.
> Or, each task manager creates rocksdb instance on temporary path, and
> send snapshots to job manager, then job manager in turn saves them on
> /data1/flinkapp on the job manager's machine?
> 
> Could you give the data flow example?
> 
> And another question is, when I turn off checkpointing (which is also
> default cfg), what happens to the states processing?
> 
> 
> 
> 2018-01-03 0:06 GMT+08:00 Timo Walther :
>> Hi Jinhua,
>> 
>> I will try to answer your questions:
>> 
>> Flink checkpoints the state of each operator. For a Kafka consumer 
>> operator
>> this is only the offset. For other operators (such as Windows or a
>> ProcessFunction) the values/list/maps stored in the state are 
>> checkpointed.
>> If you are interested in the internals, I would recommend this page [1].
>> Only the MemoryStateBackend sends entire states to the JobManager (see 
>> [2]).
>> But you are right, this is a bottleneck and not very fault-tolerant.
>> Usually, Flink assumes to have some distributed file system (such as 
>> HDFS)
>> to which each Flink operator can be checkpointed in a fault-tolerant way.
>> For the RocksDbStateBackend the local files are copied to HDFS as well. 
>> At
>> the time of writing, only the RocksDBBackend supports incremental
>> checkpoints. The JobManager can then read from HDFS and restore the 
>> operator
>> on a different machine.
>> 
>> Feel free to ask further questions.
>> 
>> Regards,
>> Timo
>> 
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/internals/stream_checkpointing.html
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/state/state_backends.html
>> 
>> 
>> 
>> Am 1/1/18 um 3:50 PM schrieb Jinhua Luo:
>> 
>>> Hi All,
>>> 
>>> I have two questions:
>>> 
>>> a) does the records/elements themselves would be checkpointed? or just
>>> record offset checkpointed? That is, what data included in the
>>> checkpoint except for states?
>>> 
>>> b) where flink stores the state globally? so that the job manager
>>> could restore them on each task manger at failure restart.
>>> 
>>> For the heap backend, all task managers would send states to job
>>> manager, and job manager would save it in its heap, correct?
>>> 
>>> For the fs/rocksdb backend, all task managers would save states
>>> (incrementally or not) in local path temporarily, and send them (in
>>> rocksdb snapshot format for the rocksdb case?) to the job manager at
>>>

Re: about the checkpoint and state backend

2018-01-04 Thread Jinhua Luo
ok, I see.

But as known, one rocksdb instance occupy one directory, so I am still
wondering what's the relationship between the states and rocksdb
instances.

2018-01-04 21:50 GMT+08:00 Aljoscha Krettek :
> Each operator (which run in a TaskManager) will write its state to some 
> location in HDFS (or any other DFS) and send a handle to this to the 
> CheckpointCoordinator (which is running on the JobManager). The 
> CheckpointCoordinator is collecting all the handles and creating one 
> Uber-Handle, which describes the complete checkpoint. When restoring, the 
> CheckpointCoordinator figures out which handles need to be sent to which 
> operators for restoring.
>
> Best,
> Aljoscha
>
>> On 4. Jan 2018, at 14:44, Jinhua Luo  wrote:
>>
>> OK, I think I get the point.
>>
>> But another question raises: how task managers merge their rocksdb
>> snapshot on a global single path?
>>
>>
>> 2018-01-04 21:30 GMT+08:00 Aljoscha Krettek :
>>> Hi,
>>>
>>> The path you give to the constructor must be a path on some distributed 
>>> filesystem, otherwise the data will be lost when the local machine crashes. 
>>> As you mentioned correctly.
>>>
>>> RocksDB will keep files in a local directory (you can specify this using 
>>> setDbStoragePath()) and when checkpointing will write to the checkpoint 
>>> directory that you specified in the constructor.
>>>
>>> Best,
>>> Aljoscha
>>>
>>>
 On 4. Jan 2018, at 14:23, Jinhua Luo  wrote:

 I still do not understand the relationship between rocksdb backend and
 the filesystem (here I refer to any filesystem impl, including local,
 hdfs, s3).

 For example, when I specify the path to rocksdb backend:
 env.setStateBackend(new RocksDBStateBackend("file:///data1/flinkapp"));

 What does it mean?

 Each task manager would save states to /data1/flinkapp on its machine?
 But it seems no sense. Because when one of the machines crashes, the
 job manager could not access the states on dead machine.
 Or, each task manager creates rocksdb instance on temporary path, and
 send snapshots to job manager, then job manager in turn saves them on
 /data1/flinkapp on the job manager's machine?

 Could you give the data flow example?

 And another question is, when I turn off checkpointing (which is also
 default cfg), what happens to the states processing?



 2018-01-03 0:06 GMT+08:00 Timo Walther :
> Hi Jinhua,
>
> I will try to answer your questions:
>
> Flink checkpoints the state of each operator. For a Kafka consumer 
> operator
> this is only the offset. For other operators (such as Windows or a
> ProcessFunction) the values/list/maps stored in the state are 
> checkpointed.
> If you are interested in the internals, I would recommend this page [1].
> Only the MemoryStateBackend sends entire states to the JobManager (see 
> [2]).
> But you are right, this is a bottleneck and not very fault-tolerant.
> Usually, Flink assumes to have some distributed file system (such as HDFS)
> to which each Flink operator can be checkpointed in a fault-tolerant way.
> For the RocksDbStateBackend the local files are copied to HDFS as well. At
> the time of writing, only the RocksDBBackend supports incremental
> checkpoints. The JobManager can then read from HDFS and restore the 
> operator
> on a different machine.
>
> Feel free to ask further questions.
>
> Regards,
> Timo
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/internals/stream_checkpointing.html
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/state/state_backends.html
>
>
>
> Am 1/1/18 um 3:50 PM schrieb Jinhua Luo:
>
>> Hi All,
>>
>> I have two questions:
>>
>> a) does the records/elements themselves would be checkpointed? or just
>> record offset checkpointed? That is, what data included in the
>> checkpoint except for states?
>>
>> b) where flink stores the state globally? so that the job manager
>> could restore them on each task manger at failure restart.
>>
>> For the heap backend, all task managers would send states to job
>> manager, and job manager would save it in its heap, correct?
>>
>> For the fs/rocksdb backend, all task managers would save states
>> (incrementally or not) in local path temporarily, and send them (in
>> rocksdb snapshot format for the rocksdb case?) to the job manager at
>> checkpoint?
>>
>> The path we used to configure backend is the path on the job manager
>> machine but not on the task managers' machines? So that's the
>> bottleneck and single failure point? So it's better to use hdfs path
>> so that we could scale the storage and make it high availability as
>> well?
>>
>> Thank you all.
>
>
>
>>>
>


Re: about the checkpoint and state backend

2018-01-04 Thread Aljoscha Krettek
Each operator (which run in a TaskManager) will write its state to some 
location in HDFS (or any other DFS) and send a handle to this to the 
CheckpointCoordinator (which is running on the JobManager). The 
CheckpointCoordinator is collecting all the handles and creating one 
Uber-Handle, which describes the complete checkpoint. When restoring, the 
CheckpointCoordinator figures out which handles need to be sent to which 
operators for restoring.

Best,
Aljoscha

> On 4. Jan 2018, at 14:44, Jinhua Luo  wrote:
> 
> OK, I think I get the point.
> 
> But another question raises: how task managers merge their rocksdb
> snapshot on a global single path?
> 
> 
> 2018-01-04 21:30 GMT+08:00 Aljoscha Krettek :
>> Hi,
>> 
>> The path you give to the constructor must be a path on some distributed 
>> filesystem, otherwise the data will be lost when the local machine crashes. 
>> As you mentioned correctly.
>> 
>> RocksDB will keep files in a local directory (you can specify this using 
>> setDbStoragePath()) and when checkpointing will write to the checkpoint 
>> directory that you specified in the constructor.
>> 
>> Best,
>> Aljoscha
>> 
>> 
>>> On 4. Jan 2018, at 14:23, Jinhua Luo  wrote:
>>> 
>>> I still do not understand the relationship between rocksdb backend and
>>> the filesystem (here I refer to any filesystem impl, including local,
>>> hdfs, s3).
>>> 
>>> For example, when I specify the path to rocksdb backend:
>>> env.setStateBackend(new RocksDBStateBackend("file:///data1/flinkapp"));
>>> 
>>> What does it mean?
>>> 
>>> Each task manager would save states to /data1/flinkapp on its machine?
>>> But it seems no sense. Because when one of the machines crashes, the
>>> job manager could not access the states on dead machine.
>>> Or, each task manager creates rocksdb instance on temporary path, and
>>> send snapshots to job manager, then job manager in turn saves them on
>>> /data1/flinkapp on the job manager's machine?
>>> 
>>> Could you give the data flow example?
>>> 
>>> And another question is, when I turn off checkpointing (which is also
>>> default cfg), what happens to the states processing?
>>> 
>>> 
>>> 
>>> 2018-01-03 0:06 GMT+08:00 Timo Walther :
 Hi Jinhua,
 
 I will try to answer your questions:
 
 Flink checkpoints the state of each operator. For a Kafka consumer operator
 this is only the offset. For other operators (such as Windows or a
 ProcessFunction) the values/list/maps stored in the state are checkpointed.
 If you are interested in the internals, I would recommend this page [1].
 Only the MemoryStateBackend sends entire states to the JobManager (see 
 [2]).
 But you are right, this is a bottleneck and not very fault-tolerant.
 Usually, Flink assumes to have some distributed file system (such as HDFS)
 to which each Flink operator can be checkpointed in a fault-tolerant way.
 For the RocksDbStateBackend the local files are copied to HDFS as well. At
 the time of writing, only the RocksDBBackend supports incremental
 checkpoints. The JobManager can then read from HDFS and restore the 
 operator
 on a different machine.
 
 Feel free to ask further questions.
 
 Regards,
 Timo
 
 [1]
 https://ci.apache.org/projects/flink/flink-docs-release-1.4/internals/stream_checkpointing.html
 [2]
 https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/state/state_backends.html
 
 
 
 Am 1/1/18 um 3:50 PM schrieb Jinhua Luo:
 
> Hi All,
> 
> I have two questions:
> 
> a) does the records/elements themselves would be checkpointed? or just
> record offset checkpointed? That is, what data included in the
> checkpoint except for states?
> 
> b) where flink stores the state globally? so that the job manager
> could restore them on each task manger at failure restart.
> 
> For the heap backend, all task managers would send states to job
> manager, and job manager would save it in its heap, correct?
> 
> For the fs/rocksdb backend, all task managers would save states
> (incrementally or not) in local path temporarily, and send them (in
> rocksdb snapshot format for the rocksdb case?) to the job manager at
> checkpoint?
> 
> The path we used to configure backend is the path on the job manager
> machine but not on the task managers' machines? So that's the
> bottleneck and single failure point? So it's better to use hdfs path
> so that we could scale the storage and make it high availability as
> well?
> 
> Thank you all.
 
 
 
>> 



Re: does the flink sink only support bio?

2018-01-04 Thread Jinhua Luo
The TwoPhaseCommitSinkFunction seems to record the transaction status
in the state just like what I imagine above, correct?
and if the progress fails before commit, in the later restart, the
commit would be triggered again, correct? So the commit would not be
forgotten, correct?

2018-01-03 22:54 GMT+08:00 Stefan Richter :
> I think a mix of async UPDATES and exactly-once all this might be tricky,
> and the typical use case for async IO is more about reads. So let’s take a
> step back: what would you like to achieve with this? Do you want a
> read-modify-update (e.g. a map function that queries and updates a DB) or
> just updates (like a sink based that goes against a DB). From the previous
> question, I assume the second case applies, in which case I wonder why you
> even need to be async for a sink? I think a much better approach could be
> based on Flink's TwoPhaseCommitSinkFunction, and maybe use some some
> batching to lower update costs.
>
> On top of the TwoPhaseCommitSinkFunction, you could implement transactions
> against your DB, similar to e.g. this example with Postgres:
> http://hlinnaka.iki.fi/2013/04/11/how-to-write-a-java-transaction-manager-that-works-with-postgresql/
> .
>
> Does this help or do you really need async read-modify-update?
>
> Best,
> Stefan
>
>
> Am 03.01.2018 um 15:08 schrieb Jinhua Luo :
>
> No, I mean how to implement exactly-once db commit (given our async io
> target is mysql), not the state used by flink.
> As mentioned in previous mail, if I commit db in
> notifyCheckpointComplete, we have a risk to lost data (lost commit,
> and flink restart would not trigger notifyCheckpointComplete for the
> last checkpoint again).
> On the other hand, if I update and commit per record, the sql/stored
> procedure have to handle duplicate updates at failure restart.
>
> So, when or where to commit so that we could get exactly-once db ingress.
>
> 2018-01-03 21:57 GMT+08:00 Stefan Richter :
>
>
> Hi,
>
>
> Then how to implement exactly-once async io? That is, neither missing
> data or duplicating data.
>
>
> From the docs about async IO here
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/asyncio.html
> :
>
> "Fault Tolerance Guarantees:
> The asynchronous I/O operator offers full exactly-once fault tolerance
> guarantees. It stores the records for in-flight asynchronous requests in
> checkpoints and restores/re-triggers the requests when recovering from a
> failure.“
>
> So it is already handled by Flink in a way that supports exactly-once.
>
> Is there some way to index data by checkpoint id and records which
> checkpoints already commit to db? But that means we need MapState,
> right?
>
>
> The information required depends a bit on the store that you are using,
> maybe the last confirmed checkpoint id is enough, but maybe you require
> something more. This transaction information is probably not „by-key“, but
> „per-operator“, so I would suggest to use operator state (see next answer).
> Btw the implementation of async operators does something very similar to
> restore pending requests, and you can see the code in „AsyncWaitOperator".
>
>
> However, the async-io operator normally follows other operators, e.g.
> fold, so it normally faces the DataStream but not KeyedStream, and
> DataStream only supports ListState, right?
>
>
> You can use non-keyed state, aka operator state, to store such information.
> See here:
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#using-managed-operator-state
> . It does not require a KeyedSteam.
>
> Best,
> Stefan
>
>
>
> 2018-01-03 18:43 GMT+08:00 Stefan Richter :
>
>
>
> Am 01.01.2018 um 15:22 schrieb Jinhua Luo :
>
> 2017-12-08 18:25 GMT+08:00 Stefan Richter :
>
> You need to be a bit careful if your sink needs exactly-once semantics. In
> this case things should either be idempotent or the db must support rolling
> back changes between checkpoints, e.g. via transactions. Commits should be
> triggered for confirmed checkpoints („notifyCheckpointComplete“).
>
>
> I doubt if we have a risk here: in notifyCheckpointComplete, the
> checkpoint was completed, and if the process crashes (or machine
> failure) before it commits the db, the flink would restart the app,
> restoring the state from the last checkpoint, but it would not invoke
> notifyCheckpointComplete again? correct? if so, we would miss the
> database ingress for the data between the last two checkpoints, am I
> correct?
>
>
> Yes, that is correct. What I was talking about was more the opposite
> problem,i.e. committing too early. In that case, you could have committed
> for a checkpoint that failed afterwards, and recovery will start from an
> earlier checkpoint but with your commit already applied. You should only
> commit after you received the notification or else your semantics can be
> down to „at-least-once".
>
>
>


Re: about the checkpoint and state backend

2018-01-04 Thread Jinhua Luo
OK, I think I get the point.

But another question raises: how task managers merge their rocksdb
snapshot on a global single path?


2018-01-04 21:30 GMT+08:00 Aljoscha Krettek :
> Hi,
>
> The path you give to the constructor must be a path on some distributed 
> filesystem, otherwise the data will be lost when the local machine crashes. 
> As you mentioned correctly.
>
> RocksDB will keep files in a local directory (you can specify this using 
> setDbStoragePath()) and when checkpointing will write to the checkpoint 
> directory that you specified in the constructor.
>
> Best,
> Aljoscha
>
>
>> On 4. Jan 2018, at 14:23, Jinhua Luo  wrote:
>>
>> I still do not understand the relationship between rocksdb backend and
>> the filesystem (here I refer to any filesystem impl, including local,
>> hdfs, s3).
>>
>> For example, when I specify the path to rocksdb backend:
>> env.setStateBackend(new RocksDBStateBackend("file:///data1/flinkapp"));
>>
>> What does it mean?
>>
>> Each task manager would save states to /data1/flinkapp on its machine?
>> But it seems no sense. Because when one of the machines crashes, the
>> job manager could not access the states on dead machine.
>> Or, each task manager creates rocksdb instance on temporary path, and
>> send snapshots to job manager, then job manager in turn saves them on
>> /data1/flinkapp on the job manager's machine?
>>
>> Could you give the data flow example?
>>
>> And another question is, when I turn off checkpointing (which is also
>> default cfg), what happens to the states processing?
>>
>>
>>
>> 2018-01-03 0:06 GMT+08:00 Timo Walther :
>>> Hi Jinhua,
>>>
>>> I will try to answer your questions:
>>>
>>> Flink checkpoints the state of each operator. For a Kafka consumer operator
>>> this is only the offset. For other operators (such as Windows or a
>>> ProcessFunction) the values/list/maps stored in the state are checkpointed.
>>> If you are interested in the internals, I would recommend this page [1].
>>> Only the MemoryStateBackend sends entire states to the JobManager (see [2]).
>>> But you are right, this is a bottleneck and not very fault-tolerant.
>>> Usually, Flink assumes to have some distributed file system (such as HDFS)
>>> to which each Flink operator can be checkpointed in a fault-tolerant way.
>>> For the RocksDbStateBackend the local files are copied to HDFS as well. At
>>> the time of writing, only the RocksDBBackend supports incremental
>>> checkpoints. The JobManager can then read from HDFS and restore the operator
>>> on a different machine.
>>>
>>> Feel free to ask further questions.
>>>
>>> Regards,
>>> Timo
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/internals/stream_checkpointing.html
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/state/state_backends.html
>>>
>>>
>>>
>>> Am 1/1/18 um 3:50 PM schrieb Jinhua Luo:
>>>
 Hi All,

 I have two questions:

 a) does the records/elements themselves would be checkpointed? or just
 record offset checkpointed? That is, what data included in the
 checkpoint except for states?

 b) where flink stores the state globally? so that the job manager
 could restore them on each task manger at failure restart.

 For the heap backend, all task managers would send states to job
 manager, and job manager would save it in its heap, correct?

 For the fs/rocksdb backend, all task managers would save states
 (incrementally or not) in local path temporarily, and send them (in
 rocksdb snapshot format for the rocksdb case?) to the job manager at
 checkpoint?

 The path we used to configure backend is the path on the job manager
 machine but not on the task managers' machines? So that's the
 bottleneck and single failure point? So it's better to use hdfs path
 so that we could scale the storage and make it high availability as
 well?

 Thank you all.
>>>
>>>
>>>
>


Re: custom writer fail to recover

2018-01-04 Thread Aljoscha Krettek
Hi,

Which version of Flink is this? It cannot recover because it expects more data 
to have been written than is there, which seems to indicate that flushing did 
not work correctly.

Best,
Aljoscha

> On 19. Dec 2017, at 00:40, xiatao123  wrote:
> 
> Hi Das,
>  Have you got your .pending issue resolved? I am running into the same
> issue where the parquet files are all in pending status.
>  Please help to share your solutions.
> Thanks,
> Tao
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: keyby() issue

2018-01-04 Thread Jinhua Luo
I mean the timeout should likely happens in the sending queue of the
redis lib if the concurrency number is low.


-org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.processElement(StreamRecord)


public void processElement(StreamRecord element) throws Exception {
final StreamRecordQueueEntry streamRecordBufferEntry = new
StreamRecordQueueEntry<>(element);

if (timeout > 0L) {
// register a timeout for this AsyncStreamRecordBufferEntry
long timeoutTimestamp = timeout +
getProcessingTimeService().getCurrentProcessingTime();

final ScheduledFuture timerFuture = getProcessingTimeService().registerTimer(
timeoutTimestamp,
new ProcessingTimeCallback() {
@Override
public void onProcessingTime(long timestamp) throws Exception {
streamRecordBufferEntry.completeExceptionally(
new TimeoutException("Async function call has timed out."));
}
});

// Cancel the timer once we've completed the stream record buffer
entry. This will remove
// the register trigger task
streamRecordBufferEntry.onComplete(
(StreamElementQueueEntry> value) -> {
timerFuture.cancel(true);
},
executor);
}

addAsyncBufferEntry(streamRecordBufferEntry);

userFunction.asyncInvoke(element.getValue(), streamRecordBufferEntry);
}

-


The timer would be set even the entry wait for the free queue slot.
It seems a bug? Because if timeout happens before the
addAsyncBufferEntry returns, and it would step into asyncInvoke
anyways, but the "future" would be failed afterwards immediately.


2018-01-04 21:31 GMT+08:00 Jinhua Luo :
> 2018-01-04 21:04 GMT+08:00 Aljoscha Krettek :
>> Memory usage should grow linearly with the number of windows you have active 
>> at any given time, the number of keys and the number of different window 
>> operations you have.
>
> But the memory usage is still too much, especially when the
> incremental aggregation is used.
>
>> Regarding the async I/O writing to redis, I see that you give a capacity of 
>> 1 which means that there will possibly be 1 concurrent connections 
>> to Redis. This might be a bit to much so could you try reducing that to 
>> avoid timeouts?
>
> It's not related to that part. In fact, I commented the async io codes
> and test, the memory usage is almost the same.
>
> And, on the contrary, I need to increase the concurrency number,
> because I have totally millions of aggregation results to sent per
> min!
> If the number is low, it would trigger timeout (yes, even the timeout
> value is 30 seconds, I think it's related to the single connection
> model of lettuce lib).


Re: Flink support for Microsoft Windows

2018-01-04 Thread Aljoscha Krettek
We have one committer (and PMC) who is developing on windows but I'm not aware 
of people running production Flink on windows. There probably still are some 
that I'm not aware of, though.

I'm cc'ing Chesnay who is the Flink Windows guy.

Best,
Aljoscha

> On 19. Dec 2017, at 09:02, avivros  wrote:
> 
> Hi All,
> 
> Does Flink fully supports MS Windows?
> Is anyone running Flink on MS Windows in Production environment?
> 
> Regards,
> Aviv
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: keyby() issue

2018-01-04 Thread Jinhua Luo
2018-01-04 21:04 GMT+08:00 Aljoscha Krettek :
> Memory usage should grow linearly with the number of windows you have active 
> at any given time, the number of keys and the number of different window 
> operations you have.

But the memory usage is still too much, especially when the
incremental aggregation is used.

> Regarding the async I/O writing to redis, I see that you give a capacity of 
> 1 which means that there will possibly be 1 concurrent connections to 
> Redis. This might be a bit to much so could you try reducing that to avoid 
> timeouts?

It's not related to that part. In fact, I commented the async io codes
and test, the memory usage is almost the same.

And, on the contrary, I need to increase the concurrency number,
because I have totally millions of aggregation results to sent per
min!
If the number is low, it would trigger timeout (yes, even the timeout
value is 30 seconds, I think it's related to the single connection
model of lettuce lib).


Re: about the checkpoint and state backend

2018-01-04 Thread Aljoscha Krettek
Hi,

The path you give to the constructor must be a path on some distributed 
filesystem, otherwise the data will be lost when the local machine crashes. As 
you mentioned correctly.

RocksDB will keep files in a local directory (you can specify this using 
setDbStoragePath()) and when checkpointing will write to the checkpoint 
directory that you specified in the constructor.

Best,
Aljoscha


> On 4. Jan 2018, at 14:23, Jinhua Luo  wrote:
> 
> I still do not understand the relationship between rocksdb backend and
> the filesystem (here I refer to any filesystem impl, including local,
> hdfs, s3).
> 
> For example, when I specify the path to rocksdb backend:
> env.setStateBackend(new RocksDBStateBackend("file:///data1/flinkapp"));
> 
> What does it mean?
> 
> Each task manager would save states to /data1/flinkapp on its machine?
> But it seems no sense. Because when one of the machines crashes, the
> job manager could not access the states on dead machine.
> Or, each task manager creates rocksdb instance on temporary path, and
> send snapshots to job manager, then job manager in turn saves them on
> /data1/flinkapp on the job manager's machine?
> 
> Could you give the data flow example?
> 
> And another question is, when I turn off checkpointing (which is also
> default cfg), what happens to the states processing?
> 
> 
> 
> 2018-01-03 0:06 GMT+08:00 Timo Walther :
>> Hi Jinhua,
>> 
>> I will try to answer your questions:
>> 
>> Flink checkpoints the state of each operator. For a Kafka consumer operator
>> this is only the offset. For other operators (such as Windows or a
>> ProcessFunction) the values/list/maps stored in the state are checkpointed.
>> If you are interested in the internals, I would recommend this page [1].
>> Only the MemoryStateBackend sends entire states to the JobManager (see [2]).
>> But you are right, this is a bottleneck and not very fault-tolerant.
>> Usually, Flink assumes to have some distributed file system (such as HDFS)
>> to which each Flink operator can be checkpointed in a fault-tolerant way.
>> For the RocksDbStateBackend the local files are copied to HDFS as well. At
>> the time of writing, only the RocksDBBackend supports incremental
>> checkpoints. The JobManager can then read from HDFS and restore the operator
>> on a different machine.
>> 
>> Feel free to ask further questions.
>> 
>> Regards,
>> Timo
>> 
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/internals/stream_checkpointing.html
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/state/state_backends.html
>> 
>> 
>> 
>> Am 1/1/18 um 3:50 PM schrieb Jinhua Luo:
>> 
>>> Hi All,
>>> 
>>> I have two questions:
>>> 
>>> a) does the records/elements themselves would be checkpointed? or just
>>> record offset checkpointed? That is, what data included in the
>>> checkpoint except for states?
>>> 
>>> b) where flink stores the state globally? so that the job manager
>>> could restore them on each task manger at failure restart.
>>> 
>>> For the heap backend, all task managers would send states to job
>>> manager, and job manager would save it in its heap, correct?
>>> 
>>> For the fs/rocksdb backend, all task managers would save states
>>> (incrementally or not) in local path temporarily, and send them (in
>>> rocksdb snapshot format for the rocksdb case?) to the job manager at
>>> checkpoint?
>>> 
>>> The path we used to configure backend is the path on the job manager
>>> machine but not on the task managers' machines? So that's the
>>> bottleneck and single failure point? So it's better to use hdfs path
>>> so that we could scale the storage and make it high availability as
>>> well?
>>> 
>>> Thank you all.
>> 
>> 
>> 



Re: about the checkpoint and state backend

2018-01-04 Thread Jinhua Luo
I still do not understand the relationship between rocksdb backend and
the filesystem (here I refer to any filesystem impl, including local,
hdfs, s3).

For example, when I specify the path to rocksdb backend:
env.setStateBackend(new RocksDBStateBackend("file:///data1/flinkapp"));

What does it mean?

Each task manager would save states to /data1/flinkapp on its machine?
But it seems no sense. Because when one of the machines crashes, the
job manager could not access the states on dead machine.
Or, each task manager creates rocksdb instance on temporary path, and
send snapshots to job manager, then job manager in turn saves them on
/data1/flinkapp on the job manager's machine?

Could you give the data flow example?

And another question is, when I turn off checkpointing (which is also
default cfg), what happens to the states processing?



2018-01-03 0:06 GMT+08:00 Timo Walther :
> Hi Jinhua,
>
> I will try to answer your questions:
>
> Flink checkpoints the state of each operator. For a Kafka consumer operator
> this is only the offset. For other operators (such as Windows or a
> ProcessFunction) the values/list/maps stored in the state are checkpointed.
> If you are interested in the internals, I would recommend this page [1].
> Only the MemoryStateBackend sends entire states to the JobManager (see [2]).
> But you are right, this is a bottleneck and not very fault-tolerant.
> Usually, Flink assumes to have some distributed file system (such as HDFS)
> to which each Flink operator can be checkpointed in a fault-tolerant way.
> For the RocksDbStateBackend the local files are copied to HDFS as well. At
> the time of writing, only the RocksDBBackend supports incremental
> checkpoints. The JobManager can then read from HDFS and restore the operator
> on a different machine.
>
> Feel free to ask further questions.
>
> Regards,
> Timo
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/internals/stream_checkpointing.html
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/state/state_backends.html
>
>
>
> Am 1/1/18 um 3:50 PM schrieb Jinhua Luo:
>
>> Hi All,
>>
>> I have two questions:
>>
>> a) does the records/elements themselves would be checkpointed? or just
>> record offset checkpointed? That is, what data included in the
>> checkpoint except for states?
>>
>> b) where flink stores the state globally? so that the job manager
>> could restore them on each task manger at failure restart.
>>
>> For the heap backend, all task managers would send states to job
>> manager, and job manager would save it in its heap, correct?
>>
>> For the fs/rocksdb backend, all task managers would save states
>> (incrementally or not) in local path temporarily, and send them (in
>> rocksdb snapshot format for the rocksdb case?) to the job manager at
>> checkpoint?
>>
>> The path we used to configure backend is the path on the job manager
>> machine but not on the task managers' machines? So that's the
>> bottleneck and single failure point? So it's better to use hdfs path
>> so that we could scale the storage and make it high availability as
>> well?
>>
>> Thank you all.
>
>
>


Re: Flink State monitoring

2018-01-04 Thread Aljoscha Krettek
Hi,

I'm afraid there is currently no metrics around state. I see that it's very 
good to have so I'm putting it on my list of stuff that we should have at some 
point.

One thing that comes to mind is checking the size of checkpoints, which gives 
you an indirect way of figuring out how big state is but that's not very exact, 
i.e. doesn't give you "number of keys" or some such.

Best,
Aljoscha

> On 20. Dec 2017, at 08:09, Netzer, Liron  wrote:
> 
> Ufuk, Thanks for replying !
> 
> Aljoscha, can you please assist with the questions below?
> 
> Thanks,
> Liron
> 
> -Original Message-
> From: Ufuk Celebi [mailto:u...@apache.org] 
> Sent: Friday, December 15, 2017 3:06 PM
> To: Netzer, Liron [ICG-IT]
> Cc: user@flink.apache.org
> Subject: Re: Flink State monitoring
> 
> Hey Liron,
> 
> unfortunately, there are no built-in metrics related to state. In general, 
> exposing the actual values as metrics is problematic, but exposing summary 
> statistics would be a good idea. I'm not aware of a good work around at the 
> moment that would work in the general case (taking into account state 
> restore, etc.).
> 
> Let me pull in Aljoscha (cc'd) who knows the state backend internals well.
> 
> @Aljoscha:
> 1) Are there any plans to expose keyed state related metrics (like number of 
> keys)?
> 2) Is there a way to work around the lack of these metrics in 1.3?
> 
> – Ufuk
> 
> On Thu, Dec 14, 2017 at 10:55 AM, Netzer, Liron  wrote:
>> Hi group,
>> 
>> 
>> 
>> We are using Flink keyed state in several operators.
>> 
>> Is there an easy was to expose the data that is stored in the state, i.e.
>> the key and the values?
>> 
>> This is needed for both monitoring as well as debugging. We would like 
>> to understand how many key+values are stored in each state and also to 
>> view the data itself.
>> 
>> I know that there is the "Queryable state" option, but this is still 
>> in Beta, and doesn't really give us what we want easily.
>> 
>> 
>> 
>> 
>> 
>> *We are using Flink 1.3.2 with Java.
>> 
>> 
>> 
>> Thanks,
>> 
>> Liron



Re: NullPointerException with Avro Serializer

2018-01-04 Thread Aljoscha Krettek
Phew, thanks for letting us know! 😃

And yes, there were some problems with Avro and class loading but I was hoping 
that we got them all before Flink 1.4.0.

Best,
Aljoscha

> On 20. Dec 2017, at 10:54, Kien Truong  wrote:
> 
> It turn out that our flink branch is out-of-date. Sorry for all the noise. :)
> 
> Regards,
> Kien
> 
> Sent from TypeApp 
> On Dec 20, 2017, at 16:42, Kien Truong  > wrote:
> Upon further investigation, we found out that the reason: 
> 
> * The cluster was started on YARN with the hadoop classpath, which includes 
> Avro. Therefore, Avro's SpecificRecord class was loaded using 
> sun.misc.Launcher$AppClassLoader 
> 
> 
> * Our LteSession class was submitted with the application jar, and loaded 
> with the child-first classloader 
> 
> * Flink check if LteSession is assignable to SpecificRecord, which fails. 
> 
> * Flink fall back to Reflection-based avro writer, which throws NPE on null 
> field. 
> 
> If we change the classloader to parent-first, everything is ok. Now the 
> question is why the default doesn't work for us. 
> 
> Best regards, 
> Kien 
> 
> Sent from TypeApp 
> On Dec 20, 2017, at 14:09, Kien Truong < duckientru...@gmail.com 
> > wrote:
> Hi, 
> 
> After upgrading to Flink 1.4, we encounter this exception 
> 
> Caused by: java.lang.NullPointerException: in 
> com.viettel.big4g.avro.LteSession in long null of long in field tmsi of 
> com.viettel.big4g.avro.LteSession 
> at 
> org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:161) 
> at 
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62) 
> at 
> org.apache.flink.formats.avro.typeutils.AvroSerializer.serialize(AvroSerializer.java:132)
>  
> at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:176)
>  
> at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:48)
>  
> at 
> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
>  
> at  org.apache.flink.runtime.io 
> .network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:93)
>  
> at  org.apache.flink.runtime.io 
> .network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:114)
>  
> at  org.apache.flink.runtime.io 
> .network.api.writer.RecordWriter.emit(RecordWriter.java:89)
>  
> at  
> org.apache.flink.streaming.runtime.io
>  
> .StreamRecordWriter.emit(StreamRecordWriter.java:84)
>  
> at  
> org.apache.flink.streaming.runtime.io
>  
> .RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:102)
>  
> 
> 
> It seems Flink attempts to use the reflection writer instead of the specific 
> writer for this schema. This is wrong, because our LteSession is an Avro 
> object, and should use the specific writer. 
> 
> Best regards, 
> Kien 
> 
> Sent from TypeApp 


Re: Queryable State Client - Actor Not found Exception

2018-01-04 Thread Aljoscha Krettek
Hi,

Is my-machine:52650 the correct address for the JobManager running in YARN? 
Also, Queryable State in Flink 1.3.2 is not easy to get to work when you use 
YARN with HA mode.

Best,
Aljoscha 

> On 3. Jan 2018, at 16:02, Velu Mitwa  wrote:
> 
> Hi,
> I am running a Flink Job which uses the Queryable State feature of Apache 
> Flink(1.3.2). I was able to do that in local mode. When I try to do that in 
> Cluster mode (Yarn Session), I am getting Actor not found Exception. 
> 
> Please help me to understand what is missing. 
> 
> Exception Trace
> 
> Query failed because of the following Exception:
> akka.actor.ActorNotFound: Actor not found for: 
> ActorSelection[Anchor(akka.tcp://flink@my-machine:52650/), 
> Path(/user/jobmanager)]
> at 
> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65)
> at 
> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
> at 
> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
> at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:73)
> at 
> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
> at 
> akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:120)
> at 
> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
> at 
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
> at 
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
> at 
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)
> at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
> at 
> scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694)
> at 
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691)
> at 
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474)
> at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425)
> at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429)
> at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
> at java.lang.Thread.run(Thread.java:745)
> 
> Client Creation Snippet 
> 
>  Configuration config = new Configuration();
> config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, 
> jobManagerHost);
> config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 
> jobManagerPort);
> 
> final HighAvailabilityServices highAvailabilityServices = 
> HighAvailabilityServicesUtils
> .createHighAvailabilityServices(config, 
> Executors.newSingleThreadScheduledExecutor(),
> 
> HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);
> 
> this.client = new QueryableStateClient(config, highAvailabilityServices);
>   }
> 



Re: keyby() issue

2018-01-04 Thread Aljoscha Krettek
Memory usage should grow linearly with the number of windows you have active at 
any given time, the number of keys and the number of different window 
operations you have.

Regarding the async I/O writing to redis, I see that you give a capacity of 
1 which means that there will possibly be 1 concurrent connections to 
Redis. This might be a bit to much so could you try reducing that to avoid 
timeouts?

> On 4. Jan 2018, at 09:39, Jinhua Luo  wrote:
> 
> The app is very simple, please see the code snippet:
> 
> https://gist.github.com/kingluo/e06381d930f34600e42b050fef6baedd
> 
> 
> I rerun the app, but it's weird that it can continuously produce the
> results now.
> 
> But it have two new issues:
> a) memory usage too high, it uses about 8 GB heap memory! why? Because
> the traffic is too high?
> b) the redis async io is likely to be timedout and fails the whole pipeline.
> 
> 
> 
> 2018-01-03 0:41 GMT+08:00 Timo Walther :
>> Hi Jinhua,
>> 
>> did you check the key group assignments? What is the distribution of
>> "MathUtils.murmurHash(keyHash) % maxParallelism" on a sample of your data?
>> This also depends on the hashCode on the output of your KeySelector.
>> 
>> keyBy should handle high traffic well, but it is designed for key spaces
>> with thousands or millions of values. If this is not the case, you need to
>> introduce some more artifical key to spread the load more evenly.
>> 
>> Regarding your OutOfMemoryError: I think you producing elements much faster
>> than the following operators after keyBy process/discard the elements. Can
>> you explain us your job in more detail? Are you using event-time? How do you
>> aggregate elements of the windows?
>> 
>> Regards,
>> Timo
>> 
>> 
>> 
>> Am 1/1/18 um 6:00 AM schrieb Jinhua Luo:
>> 
>>> I checked the logs, but no information indicates what happens.
>>> 
>>> In fact, in the same app, there is another stream, but its kafka
>>> source is low traffic, and I aggregate some field of that source too,
>>> and flink gives correct results continuously.
>>> So I doubt if keyby() could not handle high traffic well (which
>>> affects the number of keys in the key partitions).
>>> 
>>> 2018-01-01 2:04 GMT+08:00 Steven Wu :
> 
>  but soon later, no results produced, and flink seems busy doing
> something
> forever.
 
 Jinhua, don't know if you have checked these things. if not, maybe worth
 a
 look.
 
 have you tried to do a thread dump?
 How is the GC pause?
 do you see flink restart? check the exception tab in Flink web UI for
 your
 job.
 
 
 
 On Sun, Dec 31, 2017 at 6:20 AM, Jinhua Luo  wrote:
> 
> I take time to read some source codes about the keyed stream
> windowing, and I make below understanding:
> 
> a) the keyed stream would be split and dispatched to downstream tasks
> in hash manner, and the hash base is the parallelism of the downstream
> operator:
> 
> See
> 
> org.apache.flink.runtime.state.KeyGroupRangeAssignment.computeKeyGroupForKeyHash(int,
> int):
> MathUtils.murmurHash(keyHash) % maxParallelism;
> 
> That's what the doc said "hash partitioning".
> 
> So the compiled execution graph already determines whose operator
> instance receive which key groups.
> 
> b) with windowing, the key is used to index window states, so the
> window function would receive the deserialized value from its
> corresponding window state of some key.
> 
> b.1) The element would be added into the state first:
> 
> See
> 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(StreamRecord):
> windowState.add(element.getValue());
> 
> b.2) when the trigger fires the window, the value would be
> deserialized from the keyed state:
> 
> ACC contents = windowState.get();
> emitWindowContents(actualWindow, contents);
> 
> For rocksdb backend, each input element would be taken back and forth
> from the disk in the processing.
> 
> flink's keyed stream has the same functionality as storm's field
> grouping, and more complicated.
> 
> Am I correct?
> 
> 
> But I still could not understand why keyby() stops flink from
> returning expected results.
> 
> Let me explain my case more:
> I use kafka data source, which collects log lines of log files from
> tens of machines.
> The log line is in json format, which contains the "ip" field, the ip
> address of the user, so it could be valued in million of ip addresses
> of the Internet.
> The stream processing is expected to result in ip aggregation in {1
> hour, 1 min} sliding window.
> 
> If I use keyBy("ip"), then at first minutes, the flink could give me
> correct aggregation results, but soon later, no results produced, and
> flink seems busy doing something forever.
> 
> I dou

Re: Job Manager not able to fetch job info when restarted

2018-01-04 Thread Sushil Ks
Oh okay! Thanks!

On Jan 4, 2018 2:47 PM, "Stefan Richter" 
wrote:

> Yes, HA is required for what you want to do. I am not aware of an alert
> mechanism in Flink, but would assume that this is something that should
> better be solved on the YARN level?
>
> Best,
> Stefan
>
> Am 03.01.2018 um 19:27 schrieb Sushil Ks :
>
> Hi Stefan,
>  It was just ran without HA, via yarn. Will try running a yarn
> session with HA and test, thanks for you time. Also is there a way we can
> get alerts if a pipeline restarts or gets cancelled in Flink?
>
> Regards,
> Sushil Ks
>
> On Jan 3, 2018 7:07 PM, "Stefan Richter" 
> wrote:
>
>> Hi,
>>
>> did you configure high availability with Zookeeper, as described here:
>> https://ci.apache.org/projects/flink/flink-docs-releas
>> e-1.3/setup/jobmanager_high_availability.html ? This should bring the
>> desired behaviour.
>>
>> Best,
>> Stefan
>>
>> Am 03.01.2018 um 14:04 schrieb Sushil Ks :
>>
>> Hi,
>>   We are launching Flink on Yarn job, here whenever the job manager
>> gets restarted, the previous task that was running doesn't get restarted
>> instead it shows no jobs running. Is there a way to resume previous running
>> task, when job manager restarts.
>>
>> Regards,
>> Sushil Ks
>>
>>
>>
>


Re: Can't call getProducedType on Avro messages with array types

2018-01-04 Thread Aljoscha Krettek
Hi,

I think you might be able to use AvroTypeInfo which you can use by including 
the flink-avro dependencies. Is that an option for you?

Best,
Aljoscha

> On 3. Jan 2018, at 21:34, Kyle Hamlin  wrote:
> 
> Hi,
> 
> It appears that Kryo can't properly extract/deserialize Avro array types. I 
> have a very simple Avro schema that has an array type and when I remove the 
> array field the error is not thrown. Is there any way around this without 
> using a specific type?
> 
> Avro Schema:
> {
> "type": "record",
> "name": "Recieved",
> "fields": [
> {"name": "id", "type": "int"},
> {"name": "time", "type": "long"},
> {"name": "urls", "type": {"type": "array", "items": "string"}},
> ]
> }
> 
> Deserializer:
> import io.confluent.kafka.serializers.{AbstractKafkaAvroSerDeConfig, 
> KafkaAvroDeserializer}
> import org.apache.avro.generic.{GenericData, GenericRecord}
> import org.apache.flink.api.common.typeinfo.TypeInformation
> import org.apache.flink.api.java.typeutils.TypeExtractor
> import 
> org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema
> 
> import scala.collection.JavaConverters._
> import scala.reflect.ClassTag
> 
> class AvroDeserializer[T <: GenericRecord : ClassTag](schemaRegistryUrl: 
> String) extends KeyedDeserializationSchema[T] {
> 
>   @transient lazy val keyDeserializer: KafkaAvroDeserializer = {
> val deserializer = new KafkaAvroDeserializer()
> deserializer.configure(
>   Map(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> 
> schemaRegistryUrl).asJava,
>   true)
> deserializer
>   }
> 
>   // Flink needs the serializer to be serializable => this "@transient lazy 
> val" does the trick
>   @transient lazy val valueDeserializer: KafkaAvroDeserializer = {
> val deserializer = new KafkaAvroDeserializer()
> deserializer.configure(
>   // other schema-registry configuration parameters can be passed, see 
> the configure() code
>   // for details (among other things, schema cache size)
>   Map(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> 
> schemaRegistryUrl).asJava,
>   false)
> deserializer
>   }
> 
>   override def deserialize(messageKey: Array[Byte], message: Array[Byte],
>topic: String, partition: Int, offset: Long): T = {
> valueDeserializer.deserialize(topic, message).asInstanceOf[T]
>   }
> 
>   override def isEndOfStream(nextElement: T): Boolean = false
> 
>   override def getProducedType: TypeInformation[T] = {
> 
> TypeExtractor.getForClass(implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]])
>   }
> 
> }
> Stacktrace:
> Cluster configuration: Standalone cluster with JobManager at 
> localhost/127.0.0.1:6123 
> Using address localhost:6123 to connect to JobManager.
> JobManager web interface address http://localhost:8082 
> 
> Starting execution of program
> Submitting job with JobID: d9ed8f58fceaae253b84fc86e4b6fa3a. Waiting for job 
> completion.
> Connected to JobManager at 
> Actor[akka.tcp://flink@localhost:6123/user/jobmanager#-1321297259] with 
> leader session id ----.
> 01/03/2018 15:19:57   Job execution switched to status RUNNING.
> 01/03/2018 15:19:57   Source: Kafka -> Sink: Unnamed(1/1) switched to 
> SCHEDULED
> 01/03/2018 15:19:57   Source: Kafka -> Sink: Unnamed(1/1) switched to 
> DEPLOYING
> 01/03/2018 15:19:57   Source: Kafka -> Sink: Unnamed(1/1) switched to RUNNING
> 01/03/2018 15:19:59   Source: Kafka -> Sink: Unnamed(1/1) switched to FAILED
> com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
> Serialization trace:
> values (org.apache.avro.generic.GenericData$Record)
>   at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>   at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>   at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
>   at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:189)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:547)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809)
>   at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
>   at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext

Re: BucketingSink doesn't work anymore moving from 1.3.2 to 1.4.0

2018-01-04 Thread Aljoscha Krettek
I think this might be happening because partial Hadoop dependencies are in the 
user jar and the rest is only available from the Hadoop deps that come bundled 
with Flink. For example, I noticed that you have Hadoop-common as a dependency 
which probably ends up in your Jar.

> On 4. Jan 2018, at 11:40, Stephan Ewen  wrote:
> 
> Hi!
> 
> This looks indeed like a class-loading issue - it looks like "RpcEngine" and 
> "ProtobufRpcEngine" are loaded via different classloaders.
> 
> Can you try the following:
> 
>   - In your flink-conf.yml, set classloader.resolve-order: parent-first
> 
> If that fixes the issue, then we can look at a way to make this seamless...
> 
> On Wed, Jan 3, 2018 at 8:31 PM, Kyle Hamlin  > wrote:
> Hello,
> 
> After moving to Flink 1.4.0 I'm getting the following error. I can't find 
> anything online that addresses it. Is it a Hadoop dependency issue? Here are 
> my project dependencies: 
> 
> libraryDependencies ++= Seq(
>   "org.apache.flink" %% "flink-scala" % flinkVersion % Provided,
>   "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % Provided,
>   "org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion,
>   "org.apache.flink" %% "flink-connector-filesystem" % flinkVersion,
>   "org.apache.flink" % "flink-metrics-core" % flinkVersion,
>   "org.apache.flink" % "flink-metrics-graphite" % flinkVersion,
>   "org.apache.kafka" %% "kafka" % "0.10.0.1",
>   "org.apache.avro" % "avro" % "1.7.7",
>   "org.apache.parquet" % "parquet-hadoop" % "1.8.1",
>   "org.apache.parquet" % "parquet-avro" % "1.8.1",
>   "io.confluent" % "kafka-avro-serializer" % "3.2.0",
>   "org.apache.hadoop" % "hadoop-common" % "3.0.0"
> )
> Stacktrace:
> Cluster configuration: Standalone cluster with JobManager at 
> localhost/127.0.0.1:6123 
> Using address localhost:6123 to connect to JobManager.
> JobManager web interface address http://localhost:8082 
> 
> Starting execution of program
> Submitting job with JobID: b6ed965410dad61f96f8dec73b614a9f. Waiting for job 
> completion.
> Connected to JobManager at 
> Actor[akka.tcp://flink@localhost:6123/user/jobmanager#-1321297259] with 
> leader session id ----.
> 01/03/2018 14:20:52   Job execution switched to status RUNNING.
> 01/03/2018 14:20:52   Source: Kafka -> Sink: S3(1/1) switched to SCHEDULED
> 01/03/2018 14:20:52   Source: Kafka -> Sink: S3(1/1) switched to DEPLOYING
> 01/03/2018 14:20:53   Source: Kafka -> Sink: S3(1/1) switched to RUNNING
> 01/03/2018 14:20:53   Source: Kafka -> Sink: S3(1/1) switched to FAILED
> java.lang.RuntimeException: Error while creating FileSystem when initializing 
> the state of the BucketingSink.
>   at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:358)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: Cannot instantiate file system for URI: 
> hdfs://localhost:12345/
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:187)
>   at 
> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401)
>   at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1154)
>   at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:411)
>   at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:355)
>   ... 9 more
> Caused by: java.lang.ClassCastException: 
> org.apache.hadoop.ipc.ProtobufRpcEngine cannot be cast to 
> org.apache.hadoop.ipc.RpcEngine
>   at org.apache.hadoop.ipc.RPC.getProtocolEngine(RPC.java:207)
>   at org.apache.hadoop.ipc.RPC.getProtocolProxy(RPC.java:579)
>   at 
> org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:418)
>   at 
> org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:314)
>   at 
> org.a

Re: BucketingSink doesn't work anymore moving from 1.3.2 to 1.4.0

2018-01-04 Thread Stephan Ewen
@Kyle:

Please also check if you have any Hadoop classes in your user jar. There
should be none, Hadoop should only be in the Flink classpath.
Fixing the project Maven setup (making sure Hadoop and Flink core
dependencies are provided) should work.

To do that, you can for example use the latest quickstart template from
Flink 1.4

On Thu, Jan 4, 2018 at 11:40 AM, Stephan Ewen  wrote:

> Hi!
>
> This looks indeed like a class-loading issue - it looks like "RpcEngine"
> and "ProtobufRpcEngine" are loaded via different classloaders.
>
> Can you try the following:
>
>   - In your flink-conf.yml, set classloader.resolve-order: parent-first
>
> If that fixes the issue, then we can look at a way to make this seamless...
>
> On Wed, Jan 3, 2018 at 8:31 PM, Kyle Hamlin  wrote:
>
>> Hello,
>>
>> After moving to Flink 1.4.0 I'm getting the following error. I can't find
>> anything online that addresses it. Is it a Hadoop dependency issue? Here
>> are my project dependencies:
>>
>> libraryDependencies ++= Seq(
>>   "org.apache.flink" %% "flink-scala" % flinkVersion % Provided,
>>   "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % Provided,
>>   "org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion,
>>   "org.apache.flink" %% "flink-connector-filesystem" % flinkVersion,
>>   "org.apache.flink" % "flink-metrics-core" % flinkVersion,
>>   "org.apache.flink" % "flink-metrics-graphite" % flinkVersion,
>>   "org.apache.kafka" %% "kafka" % "0.10.0.1",
>>   "org.apache.avro" % "avro" % "1.7.7",
>>   "org.apache.parquet" % "parquet-hadoop" % "1.8.1",
>>   "org.apache.parquet" % "parquet-avro" % "1.8.1",
>>   "io.confluent" % "kafka-avro-serializer" % "3.2.0",
>>   "org.apache.hadoop" % "hadoop-common" % "3.0.0"
>> )
>>
>> *Stacktrace:*
>> Cluster configuration: Standalone cluster with JobManager at localhost/
>> 127.0.0.1:6123
>> Using address localhost:6123 to connect to JobManager.
>> JobManager web interface address http://localhost:8082
>> Starting execution of program
>> Submitting job with JobID: b6ed965410dad61f96f8dec73b614a9f. Waiting for
>> job completion.
>> Connected to JobManager at Actor[akka.tcp://flink@localho
>> st:6123/user/jobmanager#-1321297259] with leader session id
>> ----.
>> 01/03/2018 14:20:52 Job execution switched to status RUNNING.
>> 01/03/2018 14:20:52 Source: Kafka -> Sink: S3(1/1) switched to SCHEDULED
>> 01/03/2018 14:20:52 Source: Kafka -> Sink: S3(1/1) switched to DEPLOYING
>> 01/03/2018 14:20:53 Source: Kafka -> Sink: S3(1/1) switched to RUNNING
>> 01/03/2018 14:20:53 Source: Kafka -> Sink: S3(1/1) switched to FAILED
>> java.lang.RuntimeException: Error while creating FileSystem when
>> initializing the state of the BucketingSink.
>> at org.apache.flink.streaming.connectors.fs.bucketing.Bucketing
>> Sink.initializeState(BucketingSink.java:358)
>> at org.apache.flink.streaming.util.functions.StreamingFunctionU
>> tils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>> at org.apache.flink.streaming.util.functions.StreamingFunctionU
>> tils.restoreFunctionState(StreamingFunctionUtils.java:160)
>> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOp
>> erator.initializeState(AbstractUdfStreamOperator.java:96)
>> at org.apache.flink.streaming.api.operators.AbstractStreamOpera
>> tor.initializeState(AbstractStreamOperator.java:259)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.initiali
>> zeOperators(StreamTask.java:694)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.initiali
>> zeState(StreamTask.java:682)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(S
>> treamTask.java:253)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.io.IOException: Cannot instantiate file system for URI:
>> hdfs://localhost:12345/
>> at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(Hado
>> opFsFactory.java:187)
>> at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(F
>> ileSystem.java:401)
>> at org.apache.flink.streaming.connectors.fs.bucketing.Bucketing
>> Sink.createHadoopFileSystem(BucketingSink.java:1154)
>> at org.apache.flink.streaming.connectors.fs.bucketing.Bucketing
>> Sink.initFileSystem(BucketingSink.java:411)
>> at org.apache.flink.streaming.connectors.fs.bucketing.Bucketing
>> Sink.initializeState(BucketingSink.java:355)
>> ... 9 more
>> Caused by: java.lang.ClassCastException: 
>> org.apache.hadoop.ipc.ProtobufRpcEngine
>> cannot be cast to org.apache.hadoop.ipc.RpcEngine
>> at org.apache.hadoop.ipc.RPC.getProtocolEngine(RPC.java:207)
>> at org.apache.hadoop.ipc.RPC.getProtocolProxy(RPC.java:579)
>> at org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClie
>> ntProtocol(NameNodeProxies.java:418)
>> at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(Name
>> NodeProxies.java:314)
>> at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeP
>> roxies.java:176)
>> at org.apache.

Re: Flink CEP with event time

2018-01-04 Thread Aljoscha Krettek
Yes, because event-time only advances if something makes it advance. Basically.

> On 4. Jan 2018, at 11:34, shashank agarwal  wrote:
> 
> But this will be wrong in my case. So I have to wait for the results until I 
> receive next event.
> 
> 
> 
> ‌
> 
> On Thu, Jan 4, 2018 at 3:53 PM, Aljoscha Krettek  > wrote:
> Think this is actually working as intended, from your earlier description of 
> when results are produced: When you see Event 1.B, the watermark is not 
> sufficiently advanced to trigger computation, only when you see Event 2.A 
> does the watermark advance and you get a result. This is what I would expect 
> to happen.
> 
> 
>> On 3. Jan 2018, at 19:46, shashank agarwal > > wrote:
>> 
>> @Dawid, I was using 1.3.2, I have checked on 1.4.0 also still facing the 
>> same issue.
>> 
>> 
>> @Aljoscha, I have to cover the case where B can come after A from Kafka. How 
>> I can achieve this as Event Time is not working. How should I implement this?
>> 
>>  A followedBy B.
>> 
>> As I am using kafka source and my event API's using load balancers so 
>> sometimes B comes before A. So my CEP doesn't generate any result for those 
>> events. 
>> 
>> I am trying to use Event time like this. Am I am doing anything wrong?
>> 
>> 
>>  kafkaSource.assignTimestampsAndWatermarks(
>> new BoundedOutOfOrdernessTimestampExtractor[Event](Time.seconds(10)) 
>> {
>>   override def extractTimestamp(event: Event): Long = {
>> try {
>>   val originTime = event.origTimestamp.getOrElse("0").toLong
>>   if(originTime <= 0)
>> {
>>   val serverTime = 
>> event.serverTimestamp.getOrElse("0").toLong
>>   if(serverTime <= 0)
>> {
>>   System.currentTimeMillis()
>> }
>>   else
>> {
>>   serverTime
>> }
>> }
>>   else {
>> originTime
>>   }
>> }
>> catch {
>>   case e: Exception => Log.error("OriginTimestamp Exception 
>> occured, "error", e.printStackTrace);
>> System.currentTimeMillis()
>> }
>>   }
>> }
>>   )
>> ‌
>> 
>> On Wed, Jan 3, 2018 at 9:42 PM, Dawid Wysakowicz > > wrote:
>> Hi shashank,
>> 
>> What version of flink are you using? Is it possible that you are hitting 
>> this issue: https://issues.apache.org/jira/browse/FLINK-7563 
>>  ?
>> 
>> Watermark semantics in CEP was buggy and events were processed only if its 
>> timestamp was lower than current watermark while it should be lower or equal.
>> 
>> Best
>> Dawid
>> 
>> > On 3 Jan 2018, at 17:05, shashank agarwal > > > wrote:
>> >
>> > ssed A with origTimestamp Y. (
>> 
>> 
>> 
>> 
>> -- 
>> Thanks Regards
>> 
>> SHASHANK AGARWAL
>>  ---  Trying to mobilize the things
>> 
> 
> 
> 
> 
> -- 
> Thanks Regards
> 
> SHASHANK AGARWAL
>  ---  Trying to mobilize the things



Re: BucketingSink doesn't work anymore moving from 1.3.2 to 1.4.0

2018-01-04 Thread Stephan Ewen
Hi!

This looks indeed like a class-loading issue - it looks like "RpcEngine"
and "ProtobufRpcEngine" are loaded via different classloaders.

Can you try the following:

  - In your flink-conf.yml, set classloader.resolve-order: parent-first

If that fixes the issue, then we can look at a way to make this seamless...

On Wed, Jan 3, 2018 at 8:31 PM, Kyle Hamlin  wrote:

> Hello,
>
> After moving to Flink 1.4.0 I'm getting the following error. I can't find
> anything online that addresses it. Is it a Hadoop dependency issue? Here
> are my project dependencies:
>
> libraryDependencies ++= Seq(
>   "org.apache.flink" %% "flink-scala" % flinkVersion % Provided,
>   "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % Provided,
>   "org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion,
>   "org.apache.flink" %% "flink-connector-filesystem" % flinkVersion,
>   "org.apache.flink" % "flink-metrics-core" % flinkVersion,
>   "org.apache.flink" % "flink-metrics-graphite" % flinkVersion,
>   "org.apache.kafka" %% "kafka" % "0.10.0.1",
>   "org.apache.avro" % "avro" % "1.7.7",
>   "org.apache.parquet" % "parquet-hadoop" % "1.8.1",
>   "org.apache.parquet" % "parquet-avro" % "1.8.1",
>   "io.confluent" % "kafka-avro-serializer" % "3.2.0",
>   "org.apache.hadoop" % "hadoop-common" % "3.0.0"
> )
>
> *Stacktrace:*
> Cluster configuration: Standalone cluster with JobManager at localhost/
> 127.0.0.1:6123
> Using address localhost:6123 to connect to JobManager.
> JobManager web interface address http://localhost:8082
> Starting execution of program
> Submitting job with JobID: b6ed965410dad61f96f8dec73b614a9f. Waiting for
> job completion.
> Connected to JobManager at Actor[akka.tcp://flink@localho
> st:6123/user/jobmanager#-1321297259] with leader session id
> ----.
> 01/03/2018 14:20:52 Job execution switched to status RUNNING.
> 01/03/2018 14:20:52 Source: Kafka -> Sink: S3(1/1) switched to SCHEDULED
> 01/03/2018 14:20:52 Source: Kafka -> Sink: S3(1/1) switched to DEPLOYING
> 01/03/2018 14:20:53 Source: Kafka -> Sink: S3(1/1) switched to RUNNING
> 01/03/2018 14:20:53 Source: Kafka -> Sink: S3(1/1) switched to FAILED
> java.lang.RuntimeException: Error while creating FileSystem when
> initializing the state of the BucketingSink.
> at org.apache.flink.streaming.connectors.fs.bucketing.Bucketing
> Sink.initializeState(BucketingSink.java:358)
> at org.apache.flink.streaming.util.functions.StreamingFunctionU
> tils.tryRestoreFunction(StreamingFunctionUtils.java:178)
> at org.apache.flink.streaming.util.functions.StreamingFunctionU
> tils.restoreFunctionState(StreamingFunctionUtils.java:160)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOp
> erator.initializeState(AbstractUdfStreamOperator.java:96)
> at org.apache.flink.streaming.api.operators.AbstractStreamOpera
> tor.initializeState(AbstractStreamOperator.java:259)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.initiali
> zeOperators(StreamTask.java:694)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.initiali
> zeState(StreamTask.java:682)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: Cannot instantiate file system for URI:
> hdfs://localhost:12345/
> at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(Hado
> opFsFactory.java:187)
> at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(F
> ileSystem.java:401)
> at org.apache.flink.streaming.connectors.fs.bucketing.Bucketing
> Sink.createHadoopFileSystem(BucketingSink.java:1154)
> at org.apache.flink.streaming.connectors.fs.bucketing.Bucketing
> Sink.initFileSystem(BucketingSink.java:411)
> at org.apache.flink.streaming.connectors.fs.bucketing.Bucketing
> Sink.initializeState(BucketingSink.java:355)
> ... 9 more
> Caused by: java.lang.ClassCastException: 
> org.apache.hadoop.ipc.ProtobufRpcEngine
> cannot be cast to org.apache.hadoop.ipc.RpcEngine
> at org.apache.hadoop.ipc.RPC.getProtocolEngine(RPC.java:207)
> at org.apache.hadoop.ipc.RPC.getProtocolProxy(RPC.java:579)
> at org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClie
> ntProtocol(NameNodeProxies.java:418)
> at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(Name
> NodeProxies.java:314)
> at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeP
> roxies.java:176)
> at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:678)
> at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:619)
> at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(Dist
> ributedFileSystem.java:149)
> at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(Hado
> opFsFactory.java:159)
> ... 13 more
>


Re: Flink CEP with event time

2018-01-04 Thread shashank agarwal
But this will be wrong in my case. So I have to wait for the results until
I receive next event.



‌

On Thu, Jan 4, 2018 at 3:53 PM, Aljoscha Krettek 
wrote:

> Think this is actually working as intended, from your earlier description
> of when results are produced: When you see Event 1.B, the watermark is not
> sufficiently advanced to trigger computation, only when you see Event 2.A
> does the watermark advance and you get a result. This is what I would
> expect to happen.
>
>
> On 3. Jan 2018, at 19:46, shashank agarwal  wrote:
>
> @Dawid, I was using 1.3.2, I have checked on 1.4.0 also still facing the
> same issue.
>
>
> @Aljoscha, I have to cover the case where B can come after A from Kafka.
> How I can achieve this as Event Time is not working. How should I implement
> this?
>
>  A followedBy B.
>
> As I am using kafka source and my event API's using load balancers so
> sometimes B comes before A. So my CEP doesn't generate any result for those
> events.
>
> I am trying to use Event time like this. Am I am doing anything wrong?
>
>
>  kafkaSource.assignTimestampsAndWatermarks(
> new BoundedOutOfOrdernessTimestampExtractor[Event](Time.seconds(10))
> {
>   override def extractTimestamp(event: Event): Long = {
> try {
>   val originTime = event.origTimestamp.getOrElse("0").toLong
>   if(originTime <= 0)
> {
>   val serverTime = event.serverTimestamp.
> getOrElse("0").toLong
>   if(serverTime <= 0)
> {
>   System.currentTimeMillis()
> }
>   else
> {
>   serverTime
> }
> }
>   else {
> originTime
>   }
> }
> catch {
>   case e: Exception => Log.error("OriginTimestamp Exception
> occured, "error", e.printStackTrace);
> System.currentTimeMillis()
> }
>   }
> }
>   )
> ‌
>
> On Wed, Jan 3, 2018 at 9:42 PM, Dawid Wysakowicz <
> wysakowicz.da...@gmail.com> wrote:
>
>> Hi shashank,
>>
>> What version of flink are you using? Is it possible that you are hitting
>> this issue: https://issues.apache.org/jira/browse/FLINK-7563 ?
>>
>> Watermark semantics in CEP was buggy and events were processed only if
>> its timestamp was lower than current watermark while it should be lower or
>> equal.
>>
>> Best
>> Dawid
>>
>> > On 3 Jan 2018, at 17:05, shashank agarwal 
>> wrote:
>> >
>> > ssed A with origTimestamp Y. (
>>
>>
>
>
> --
> Thanks Regards
>
> SHASHANK AGARWAL
>  ---  Trying to mobilize the things
>
>
>


-- 
Thanks Regards

SHASHANK AGARWAL
 ---  Trying to mobilize the things


RE: Lower Parallelism derives better latency

2018-01-04 Thread Netzer, Liron
Hi,
Yes, one machine. Only local connections.
CPU wasn't really effected by the parallelism changes, the CPU consumption was 
~28%.
I'm finding out whether I'm allowed to send the code, will update soon.

Thanks,
Liron

From: Stephan Ewen [mailto:se...@apache.org]
Sent: Thursday, January 04, 2018 12:20 PM
To: Stefan Richter
Cc: Netzer, Liron [ICG-IT]; user@flink.apache.org
Subject: Re: Lower Parallelism derives better latency

Just to make sure:

  - This runs on one machine, so only local connections?


On Thu, Jan 4, 2018 at 10:47 AM, Stefan Richter 
mailto:s.rich...@data-artisans.com>> wrote:
Hi,

ok, throughput sounds good then, and I assume there is also no unexpected 
increase in CPU usage? For the code example, maybe it is possible to minimize 
the code (dropping all the confidential business logic, simple generator 
sources,…) , while still keeping the general shape of the job intact?

This is the first report for a problem like this that I am aware of. And 
unfortunately, there could be many factors. For example, if you are using event 
time, could changing the parallelism impact your watermark progression and 
therefor the time it takes to trigger your windows?

Best,
Stefan



Am 04.01.2018 um 10:30 schrieb Netzer, Liron 
mailto:liron.net...@citi.com>>:

Hi Stefan,
The throughput was the same in all of the executions. This was well validated 
in each test, as this is what I also suspected that can be effected.
The throughput was ~110,000 messages per second.
Regarding the code example, this is a bit confidential, let me think what I can 
do and get back to you.
Am I the first one who encountered such an issue?

Thanks,
Liron


From: Stefan Richter [mailto:s.rich...@data-artisans.com]
Sent: Thursday, January 04, 2018 11:15 AM
To: Netzer, Liron [ICG-IT]
Cc: user@flink.apache.org
Subject: Re: Lower Parallelism derives better latency

Hi,

ok that would have been good to know, so forget about my explanation attempt 
:-). This makes it interesting, and at the same time I cannot come up with an 
„easy“ explanation. It is not even clear if the reason for this is a general 
problem in Flink, your setup, or caused by something that your job is doing. 
Two more questions: What happens to the throughput in that experiment? Does it 
also decrease or increase? I just want to rule out that some general overhead 
is introduced. Second, do you have or could you create some (minimal) code 
example to reproduce the problem that you could share with us (of course you 
can also share this in privat)? This would be very helpful!

Best,
Stefan

Am 04.01.2018 um 08:45 schrieb Netzer, Liron 
mailto:liron.net...@citi.com>>:

Hi Stefan,
Thanks for replying.

All of the tests below were executed with a buffer timeout of zero:

 env.setBufferTimeout(0);
so this means that the buffers were flushed after each record.

Any other explanation? ☺

Thanks,
Liron


From: Stefan Richter [mailto:s.rich...@data-artisans.com]
Sent: Wednesday, January 03, 2018 3:20 PM
To: Netzer, Liron [ICG-IT]
Cc: user@flink.apache.org
Subject: Re: Lower Parallelism derives better latency

Hi,

one possible explanation that I see is the following: in a shuffle, each there 
are input and output buffers for each parallel subtask to which data could be 
shuffled. Those buffers are flushed either when full or after a timeout 
interval. If you increase the parallelism, there are more buffers and each 
buffer gets a smaller fraction of the data. This, in turn, means that it takes 
longer until an individual buffer is full and data is emitted. The timeout 
interval enforces an upper bound.

Your experiments works on a very small scale, and I would not assume that this 
would increase latency without bounds - at least once you hit the buffer 
timeout interval the latency should no longer increase. You could validate this 
by configuring smaller buffer sizes and test how this impacts the experiment.

Best,
Stefan


Am 03.01.2018 um 08:13 schrieb Netzer, Liron 
mailto:liron.net...@citi.com>>:

Hi group,

We have a standalone Flink cluster that is running on a UNIX host with 40 CPUs 
and 256GB RAM.
There is one task manager and 24 slots were defined.
When we decrease the parallelism of the Stream graph operators(each operator 
has the same parallelism),
we see a consistent change in the latency, it gets better:
Test run

Parallelism

99 percentile

95 percentile

75 percentile

Mean

#1

8

4.15 ms

2.02 ms

0.22 ms

0.42 ms

#2

7

3.6 ms

1.68 ms

0.14 ms

0.34 ms

#3

6

3 ms

1.4 ms

0.13 ms

0.3 ms

#4

5

2.1 ms

0.95 ms

0.1 ms

0.22 ms

#5

4

1.5 ms

0.64 ms

0.09 ms

0.16 ms


This was a surprise for us, as we expected that higher parallelism will derive 
better latency.
Could you try to assist us to understand this behavior?
I know that when there are more threads that are involved, there is probably 
more serialization/deserialization, but this can't be the only reason for this 
behavior.

W

Re: Flink CEP with event time

2018-01-04 Thread Aljoscha Krettek
Think this is actually working as intended, from your earlier description of 
when results are produced: When you see Event 1.B, the watermark is not 
sufficiently advanced to trigger computation, only when you see Event 2.A does 
the watermark advance and you get a result. This is what I would expect to 
happen.


> On 3. Jan 2018, at 19:46, shashank agarwal  wrote:
> 
> @Dawid, I was using 1.3.2, I have checked on 1.4.0 also still facing the same 
> issue.
> 
> 
> @Aljoscha, I have to cover the case where B can come after A from Kafka. How 
> I can achieve this as Event Time is not working. How should I implement this?
> 
>  A followedBy B.
> 
> As I am using kafka source and my event API's using load balancers so 
> sometimes B comes before A. So my CEP doesn't generate any result for those 
> events. 
> 
> I am trying to use Event time like this. Am I am doing anything wrong?
> 
> 
>  kafkaSource.assignTimestampsAndWatermarks(
> new BoundedOutOfOrdernessTimestampExtractor[Event](Time.seconds(10)) {
>   override def extractTimestamp(event: Event): Long = {
> try {
>   val originTime = event.origTimestamp.getOrElse("0").toLong
>   if(originTime <= 0)
> {
>   val serverTime = event.serverTimestamp.getOrElse("0").toLong
>   if(serverTime <= 0)
> {
>   System.currentTimeMillis()
> }
>   else
> {
>   serverTime
> }
> }
>   else {
> originTime
>   }
> }
> catch {
>   case e: Exception => Log.error("OriginTimestamp Exception 
> occured, "error", e.printStackTrace);
> System.currentTimeMillis()
> }
>   }
> }
>   )
> ‌
> 
> On Wed, Jan 3, 2018 at 9:42 PM, Dawid Wysakowicz  > wrote:
> Hi shashank,
> 
> What version of flink are you using? Is it possible that you are hitting this 
> issue: https://issues.apache.org/jira/browse/FLINK-7563 
>  ?
> 
> Watermark semantics in CEP was buggy and events were processed only if its 
> timestamp was lower than current watermark while it should be lower or equal.
> 
> Best
> Dawid
> 
> > On 3 Jan 2018, at 17:05, shashank agarwal  > > wrote:
> >
> > ssed A with origTimestamp Y. (
> 
> 
> 
> 
> -- 
> Thanks Regards
> 
> SHASHANK AGARWAL
>  ---  Trying to mobilize the things
> 



Re: Lower Parallelism derives better latency

2018-01-04 Thread Stephan Ewen
Just to make sure:

  - This runs on one machine, so only local connections?


On Thu, Jan 4, 2018 at 10:47 AM, Stefan Richter  wrote:

> Hi,
>
> ok, throughput sounds good then, and I assume there is also no unexpected
> increase in CPU usage? For the code example, maybe it is possible to
> minimize the code (dropping all the confidential business logic, simple
> generator sources,…) , while still keeping the general shape of the job
> intact?
>
> This is the first report for a problem like this that I am aware of. And
> unfortunately, there could be many factors. For example, if you are using
> event time, could changing the parallelism impact your watermark
> progression and therefor the time it takes to trigger your windows?
>
> Best,
> Stefan
>
>
> Am 04.01.2018 um 10:30 schrieb Netzer, Liron :
>
> Hi Stefan,
> The throughput was the same in all of the executions. This was well
> validated in each test, as this is what I also suspected that can be
> effected.
> The throughput was ~110,000 messages per second.
> Regarding the code example, this is a bit confidential, let me think what
> I can do and get back to you.
> Am I the first one who encountered such an issue?
>
> Thanks,
> Liron
>
>
> *From:* Stefan Richter [mailto:s.rich...@data-artisans.com
> ]
> *Sent:* Thursday, January 04, 2018 11:15 AM
> *To:* Netzer, Liron [ICG-IT]
> *Cc:* user@flink.apache.org
> *Subject:* Re: Lower Parallelism derives better latency
>
> Hi,
>
> ok that would have been good to know, so forget about my explanation
> attempt :-). This makes it interesting, and at the same time I cannot come
> up with an „easy“ explanation. It is not even clear if the reason for this
> is a general problem in Flink, your setup, or caused by something that your
> job is doing. Two more questions: What happens to the throughput in that
> experiment? Does it also decrease or increase? I just want to rule out that
> some general overhead is introduced. Second, do you have or could you
> create some (minimal) code example to reproduce the problem that you could
> share with us (of course you can also share this in privat)? This would be
> very helpful!
>
> Best,
> Stefan
>
>
> Am 04.01.2018 um 08:45 schrieb Netzer, Liron :
>
> Hi Stefan,
> Thanks for replying.
>
> All of the tests below were executed with a buffer timeout of zero:
>
>  env.setBufferTimeout(0);
>
> so this means that the buffers were flushed after each record.
>
> Any other explanation? J
>
> Thanks,
> Liron
>
>
> *From:* Stefan Richter [mailto:s.rich...@data-artisans.com
> ]
> *Sent:* Wednesday, January 03, 2018 3:20 PM
> *To:* Netzer, Liron [ICG-IT]
> *Cc:* user@flink.apache.org
> *Subject:* Re: Lower Parallelism derives better latency
>
> Hi,
>
> one possible explanation that I see is the following: in a shuffle, each
> there are input and output buffers for each parallel subtask to which data
> could be shuffled. Those buffers are flushed either when full or after a
> timeout interval. If you increase the parallelism, there are more buffers
> and each buffer gets a smaller fraction of the data. This, in turn, means
> that it takes longer until an individual buffer is full and data is
> emitted. The timeout interval enforces an upper bound.
>
> Your experiments works on a very small scale, and I would not assume that
> this would increase latency without bounds - at least once you hit the
> buffer timeout interval the latency should no longer increase. You could
> validate this by configuring smaller buffer sizes and test how this impacts
> the experiment.
>
> Best,
> Stefan
>
>
>
> Am 03.01.2018 um 08:13 schrieb Netzer, Liron :
>
> Hi group,
>
> We have a standalone Flink cluster that is running on a UNIX host with 40
> CPUs and 256GB RAM.
> There is one task manager and 24 slots were defined.
> When we decrease the parallelism of the Stream graph operators(each
> operator has the same parallelism),
> we see a consistent change in the latency, it gets better:
> *Test run*
> *Parallelism*
> *99 percentile*
> *95 percentile*
> *75 percentile*
> *Mean*
> *#1*
> 8
> 4.15 ms
> 2.02 ms
> 0.22 ms
> 0.42 ms
> *#2*
> 7
> 3.6 ms
> 1.68 ms
> 0.14 ms
> 0.34 ms
> *#3*
> 6
> 3 ms
> 1.4 ms
> 0.13 ms
> 0.3 ms
> *#4*
> 5
> 2.1 ms
> 0.95 ms
> 0.1 ms
> 0.22 ms
> *#5*
> 4
> 1.5 ms
> 0.64 ms
> 0.09 ms
> 0.16 ms
>
> This was a surprise for us, as we expected that higher parallelism will
> derive better latency.
> Could you try to assist us to understand this behavior?
> I know that when there are more threads that are involved, there is
> probably more serialization/deserialization, but this can't be the only
> reason for this behavior.
>
> We have two Kafka sources, and the rest of the operators are fixed
> windows, flatmaps, coMappers and *several* KeyBys.
> Except for the Kafka sources and some internal logging, there is no other
> I/O (i.e. we do not connect to any external DB/queue)
> We use Flink 1.3.
>
>
> Thanks,
> Liron
>
>
>


Re: Python API not working

2018-01-04 Thread Yassine MARZOUGUI
Hi all,

Any ideas on this?


2017-12-15 15:10 GMT+01:00 Yassine MARZOUGUI :

> Hi Ufuk,
>
> Thanks for your response. Unfortunately specifying 'streaming` or `batch`
> doesn't work, it looks like mode should be either "plan" or "operator" ,
> and then the program expects other inputs from the stdin (id, port, etc.).
>
> 2017-12-15 14:23 GMT+01:00 Ufuk Celebi :
>
>> Hey Yassine,
>>
>> let me include Chesnay (cc'd) who worked on the Python API.
>>
>> I'm not familiar with the API and what it expects, but try entering
>> `streaming` or `batch` for the mode. Chesnay probably has the details.
>>
>> – Ufuk
>>
>>
>> On Fri, Dec 15, 2017 at 11:05 AM, Yassine MARZOUGUI
>>  wrote:
>> > Hi All,
>> >
>> > I'm trying to use Flink with the python API, and started with the
>> wordcount
>> > exemple from the Documentation. I'm using Flink 1.4 and python 2.7.
>> > When running env.execute(local=True), the command doesn't execute and
>> keeps
>> > waiting for input. If I hit enter again I get the following error from
>> > Environment.py : ValueError("Invalid mode specified: " + mode)
>> > Looking at the source code, it looks like there are a bunch of
>> > sys.stdin.readline().rstrip('\n') where an input is expected from the
>> user.
>> > Any idea how to run the job? Thank you.
>> >
>> > Best,
>> > Yassine
>> >
>>
>
>


Re: Lower Parallelism derives better latency

2018-01-04 Thread Stefan Richter
Hi,

ok, throughput sounds good then, and I assume there is also no unexpected 
increase in CPU usage? For the code example, maybe it is possible to minimize 
the code (dropping all the confidential business logic, simple generator 
sources,…) , while still keeping the general shape of the job intact?

This is the first report for a problem like this that I am aware of. And 
unfortunately, there could be many factors. For example, if you are using event 
time, could changing the parallelism impact your watermark progression and 
therefor the time it takes to trigger your windows?

Best,
Stefan

> Am 04.01.2018 um 10:30 schrieb Netzer, Liron :
> 
> Hi Stefan,
> The throughput was the same in all of the executions. This was well validated 
> in each test, as this is what I also suspected that can be effected. 
> The throughput was ~110,000 messages per second.
> Regarding the code example, this is a bit confidential, let me think what I 
> can do and get back to you.
> Am I the first one who encountered such an issue?
>  
> Thanks,
> Liron
>  
>  
> From: Stefan Richter [mailto:s.rich...@data-artisans.com] 
> Sent: Thursday, January 04, 2018 11:15 AM
> To: Netzer, Liron [ICG-IT]
> Cc: user@flink.apache.org
> Subject: Re: Lower Parallelism derives better latency
>  
> Hi,
>  
> ok that would have been good to know, so forget about my explanation attempt 
> :-). This makes it interesting, and at the same time I cannot come up with an 
> „easy“ explanation. It is not even clear if the reason for this is a general 
> problem in Flink, your setup, or caused by something that your job is doing. 
> Two more questions: What happens to the throughput in that experiment? Does 
> it also decrease or increase? I just want to rule out that some general 
> overhead is introduced. Second, do you have or could you create some 
> (minimal) code example to reproduce the problem that you could share with us 
> (of course you can also share this in privat)? This would be very helpful!
>  
> Best,
> Stefan
> 
> 
> Am 04.01.2018 um 08:45 schrieb Netzer, Liron  >:
>  
> Hi Stefan,
> Thanks for replying.
> All of the tests below were executed with a buffer timeout of zero:
>  env.setBufferTimeout(0);
> so this means that the buffers were flushed after each record.
>  
> Any other explanation? J
>  
> Thanks,
> Liron
>  
>  
> From: Stefan Richter [mailto:s.rich...@data-artisans.com 
> ] 
> Sent: Wednesday, January 03, 2018 3:20 PM
> To: Netzer, Liron [ICG-IT]
> Cc: user@flink.apache.org 
> Subject: Re: Lower Parallelism derives better latency
>  
> Hi,
>  
> one possible explanation that I see is the following: in a shuffle, each 
> there are input and output buffers for each parallel subtask to which data 
> could be shuffled. Those buffers are flushed either when full or after a 
> timeout interval. If you increase the parallelism, there are more buffers and 
> each buffer gets a smaller fraction of the data. This, in turn, means that it 
> takes longer until an individual buffer is full and data is emitted. The 
> timeout interval enforces an upper bound.
>  
> Your experiments works on a very small scale, and I would not assume that 
> this would increase latency without bounds - at least once you hit the buffer 
> timeout interval the latency should no longer increase. You could validate 
> this by configuring smaller buffer sizes and test how this impacts the 
> experiment.
>  
> Best,
> Stefan
> 
> 
> 
> Am 03.01.2018 um 08:13 schrieb Netzer, Liron  >:
>  
> Hi group,
>  
> We have a standalone Flink cluster that is running on a UNIX host with 40 
> CPUs and 256GB RAM.
> There is one task manager and 24 slots were defined.
> When we decrease the parallelism of the Stream graph operators(each operator 
> has the same parallelism),
> we see a consistent change in the latency, it gets better:
> Test run
> Parallelism
> 99 percentile
> 95 percentile
> 75 percentile
> Mean
> #1
> 8
> 4.15 ms
> 2.02 ms
> 0.22 ms
> 0.42 ms
> #2
> 7
> 3.6 ms
> 1.68 ms
> 0.14 ms
> 0.34 ms
> #3
> 6
> 3 ms
> 1.4 ms
> 0.13 ms
> 0.3 ms
> #4
> 5
> 2.1 ms
> 0.95 ms
> 0.1 ms
> 0.22 ms
> #5
> 4
> 1.5 ms
> 0.64 ms
> 0.09 ms
> 0.16 ms
>  
> This was a surprise for us, as we expected that higher parallelism will 
> derive better latency.
> Could you try to assist us to understand this behavior? 
> I know that when there are more threads that are involved, there is probably 
> more serialization/deserialization, but this can't be the only reason for 
> this behavior.
>  
> We have two Kafka sources, and the rest of the operators are fixed windows, 
> flatmaps, coMappers and several KeyBys. 
> Except for the Kafka sources and some internal logging, there is no other I/O 
> (i.e. we do not connect to any external DB/queue)
> We use Flink 1.3.
>  
>  
> Thanks,
> Liron



RE: Lower Parallelism derives better latency

2018-01-04 Thread Netzer, Liron
Hi Stefan,
The throughput was the same in all of the executions. This was well validated 
in each test, as this is what I also suspected that can be effected.
The throughput was ~110,000 messages per second.
Regarding the code example, this is a bit confidential, let me think what I can 
do and get back to you.
Am I the first one who encountered such an issue?

Thanks,
Liron


From: Stefan Richter [mailto:s.rich...@data-artisans.com]
Sent: Thursday, January 04, 2018 11:15 AM
To: Netzer, Liron [ICG-IT]
Cc: user@flink.apache.org
Subject: Re: Lower Parallelism derives better latency

Hi,

ok that would have been good to know, so forget about my explanation attempt 
:-). This makes it interesting, and at the same time I cannot come up with an 
„easy“ explanation. It is not even clear if the reason for this is a general 
problem in Flink, your setup, or caused by something that your job is doing. 
Two more questions: What happens to the throughput in that experiment? Does it 
also decrease or increase? I just want to rule out that some general overhead 
is introduced. Second, do you have or could you create some (minimal) code 
example to reproduce the problem that you could share with us (of course you 
can also share this in privat)? This would be very helpful!

Best,
Stefan


Am 04.01.2018 um 08:45 schrieb Netzer, Liron 
mailto:liron.net...@citi.com>>:

Hi Stefan,
Thanks for replying.

All of the tests below were executed with a buffer timeout of zero:

 env.setBufferTimeout(0);
so this means that the buffers were flushed after each record.

Any other explanation? ☺

Thanks,
Liron


From: Stefan Richter [mailto:s.rich...@data-artisans.com]
Sent: Wednesday, January 03, 2018 3:20 PM
To: Netzer, Liron [ICG-IT]
Cc: user@flink.apache.org
Subject: Re: Lower Parallelism derives better latency

Hi,

one possible explanation that I see is the following: in a shuffle, each there 
are input and output buffers for each parallel subtask to which data could be 
shuffled. Those buffers are flushed either when full or after a timeout 
interval. If you increase the parallelism, there are more buffers and each 
buffer gets a smaller fraction of the data. This, in turn, means that it takes 
longer until an individual buffer is full and data is emitted. The timeout 
interval enforces an upper bound.

Your experiments works on a very small scale, and I would not assume that this 
would increase latency without bounds - at least once you hit the buffer 
timeout interval the latency should no longer increase. You could validate this 
by configuring smaller buffer sizes and test how this impacts the experiment.

Best,
Stefan



Am 03.01.2018 um 08:13 schrieb Netzer, Liron 
mailto:liron.net...@citi.com>>:

Hi group,

We have a standalone Flink cluster that is running on a UNIX host with 40 CPUs 
and 256GB RAM.
There is one task manager and 24 slots were defined.
When we decrease the parallelism of the Stream graph operators(each operator 
has the same parallelism),
we see a consistent change in the latency, it gets better:
Test run

Parallelism

99 percentile

95 percentile

75 percentile

Mean

#1

8

4.15 ms

2.02 ms

0.22 ms

0.42 ms

#2

7

3.6 ms

1.68 ms

0.14 ms

0.34 ms

#3

6

3 ms

1.4 ms

0.13 ms

0.3 ms

#4

5

2.1 ms

0.95 ms

0.1 ms

0.22 ms

#5

4

1.5 ms

0.64 ms

0.09 ms

0.16 ms


This was a surprise for us, as we expected that higher parallelism will derive 
better latency.
Could you try to assist us to understand this behavior?
I know that when there are more threads that are involved, there is probably 
more serialization/deserialization, but this can't be the only reason for this 
behavior.

We have two Kafka sources, and the rest of the operators are fixed windows, 
flatmaps, coMappers and several KeyBys.
Except for the Kafka sources and some internal logging, there is no other I/O 
(i.e. we do not connect to any external DB/queue)
We use Flink 1.3.


Thanks,
Liron



Re: Job Manager not able to fetch job info when restarted

2018-01-04 Thread Stefan Richter
Yes, HA is required for what you want to do. I am not aware of an alert 
mechanism in Flink, but would assume that this is something that should better 
be solved on the YARN level?

Best,
Stefan

> Am 03.01.2018 um 19:27 schrieb Sushil Ks :
> 
> Hi Stefan,
>  It was just ran without HA, via yarn. Will try running a yarn 
> session with HA and test, thanks for you time. Also is there a way we can get 
> alerts if a pipeline restarts or gets cancelled in Flink?
> 
> Regards,
> Sushil Ks
> 
> 
> On Jan 3, 2018 7:07 PM, "Stefan Richter"  > wrote:
> Hi,
> 
> did you configure high availability with Zookeeper, as described here: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/jobmanager_high_availability.html
>  
> 
>  ? This should bring the desired behaviour.
> 
> Best,
> Stefan
> 
>> Am 03.01.2018 um 14:04 schrieb Sushil Ks > >:
>> 
>> Hi,
>>   We are launching Flink on Yarn job, here whenever the job manager gets 
>> restarted, the previous task that was running doesn't get restarted instead 
>> it shows no jobs running. Is there a way to resume previous running task, 
>> when job manager restarts.
>> 
>> Regards,
>> Sushil Ks 
> 



Re: Lower Parallelism derives better latency

2018-01-04 Thread Stefan Richter
Hi,

ok that would have been good to know, so forget about my explanation attempt 
:-). This makes it interesting, and at the same time I cannot come up with an 
„easy“ explanation. It is not even clear if the reason for this is a general 
problem in Flink, your setup, or caused by something that your job is doing. 
Two more questions: What happens to the throughput in that experiment? Does it 
also decrease or increase? I just want to rule out that some general overhead 
is introduced. Second, do you have or could you create some (minimal) code 
example to reproduce the problem that you could share with us (of course you 
can also share this in privat)? This would be very helpful!

Best,
Stefan

> Am 04.01.2018 um 08:45 schrieb Netzer, Liron :
> 
> Hi Stefan,
> Thanks for replying.
> All of the tests below were executed with a buffer timeout of zero:
>  env.setBufferTimeout(0);
> so this means that the buffers were flushed after each record.
>  
> Any other explanation? J
>  
> Thanks,
> Liron
>  
>  
> From: Stefan Richter [mailto:s.rich...@data-artisans.com] 
> Sent: Wednesday, January 03, 2018 3:20 PM
> To: Netzer, Liron [ICG-IT]
> Cc: user@flink.apache.org
> Subject: Re: Lower Parallelism derives better latency
>  
> Hi,
>  
> one possible explanation that I see is the following: in a shuffle, each 
> there are input and output buffers for each parallel subtask to which data 
> could be shuffled. Those buffers are flushed either when full or after a 
> timeout interval. If you increase the parallelism, there are more buffers and 
> each buffer gets a smaller fraction of the data. This, in turn, means that it 
> takes longer until an individual buffer is full and data is emitted. The 
> timeout interval enforces an upper bound.
>  
> Your experiments works on a very small scale, and I would not assume that 
> this would increase latency without bounds - at least once you hit the buffer 
> timeout interval the latency should no longer increase. You could validate 
> this by configuring smaller buffer sizes and test how this impacts the 
> experiment.
>  
> Best,
> Stefan
> 
> 
> Am 03.01.2018 um 08:13 schrieb Netzer, Liron  >:
>  
> Hi group,
>  
> We have a standalone Flink cluster that is running on a UNIX host with 40 
> CPUs and 256GB RAM.
> There is one task manager and 24 slots were defined.
> When we decrease the parallelism of the Stream graph operators(each operator 
> has the same parallelism),
> we see a consistent change in the latency, it gets better:
> Test run
> Parallelism
> 99 percentile
> 95 percentile
> 75 percentile
> Mean
> #1
> 8
> 4.15 ms
> 2.02 ms
> 0.22 ms
> 0.42 ms
> #2
> 7
> 3.6 ms
> 1.68 ms
> 0.14 ms
> 0.34 ms
> #3
> 6
> 3 ms
> 1.4 ms
> 0.13 ms
> 0.3 ms
> #4
> 5
> 2.1 ms
> 0.95 ms
> 0.1 ms
> 0.22 ms
> #5
> 4
> 1.5 ms
> 0.64 ms
> 0.09 ms
> 0.16 ms
>  
> This was a surprise for us, as we expected that higher parallelism will 
> derive better latency.
> Could you try to assist us to understand this behavior? 
> I know that when there are more threads that are involved, there is probably 
> more serialization/deserialization, but this can't be the only reason for 
> this behavior.
>  
> We have two Kafka sources, and the rest of the operators are fixed windows, 
> flatmaps, coMappers and several KeyBys. 
> Except for the Kafka sources and some internal logging, there is no other I/O 
> (i.e. we do not connect to any external DB/queue)
> We use Flink 1.3.
>  
>  
> Thanks,
> Liron



Re: Apache Flink - Question about rolling window function on KeyedStream

2018-01-04 Thread Fabian Hueske
Hi,

the ReduceFunction holds the last emitted record as state. When a new
record arrives, it reduces the new record and last emitted record, updates
its state, and emits the new result.
Therefore, a ReduceFunction emits one output record for each input record,
i.e., it is triggered for each input record. The output of the
ReduceFunction should be treated as a stream of updates not of final
results.

Best, Fabian

2018-01-03 18:46 GMT+01:00 M Singh :

> Hi Stefan:
>
> Thanks for your response.
>
> A follow up question - In a streaming environment, we invoke the operation
> reduce and then output results to the sink. Does this mean reduce will be
> executed once on every trigger per partition with all the items in each
> partition ?
>
> Thanks
>
>
> On Wednesday, January 3, 2018 2:46 AM, Stefan Richter <
> s.rich...@data-artisans.com> wrote:
>
>
> Hi,
>
> I would interpret this as: the reduce produces an output for every new
> reduce call, emitting the updated value. There is no need for a window
> because it kicks in on every single invocation.
>
> Best,
> Stefan
>
>
> Am 31.12.2017 um 22:28 schrieb M Singh :
>
> Hi:
>
> Apache Flink documentation (https://ci.apache.org/
> projects/flink/flink-docs-release-1.4/dev/stream/operators/index.html)
> indicates that a reduce function on a KeyedStream  as follows:
>
> A "rolling" reduce on a keyed data stream. Combines the current element
> with the last reduced value and emits the new value.
>
> A reduce function that creates a stream of partial sums:
>
> keyedStream.reduce(new ReduceFunction() {
> @Override
> public Integer reduce(Integer value1, Integer value2)
> throws Exception {
> return value1 + value2;
> }});
>
>
> The KeyedStream is not windowed, so when does the reduce function kick in
> to produce the DataStream (ie, is there a default time out, or collection
> size that triggers it, since we have not defined any window on it).
>
> Thanks
>
> Mans
>
>
>
>
>


Re: keyby() issue

2018-01-04 Thread Jinhua Luo
The app is very simple, please see the code snippet:

https://gist.github.com/kingluo/e06381d930f34600e42b050fef6baedd


I rerun the app, but it's weird that it can continuously produce the
results now.

But it have two new issues:
a) memory usage too high, it uses about 8 GB heap memory! why? Because
the traffic is too high?
b) the redis async io is likely to be timedout and fails the whole pipeline.



2018-01-03 0:41 GMT+08:00 Timo Walther :
> Hi Jinhua,
>
> did you check the key group assignments? What is the distribution of
> "MathUtils.murmurHash(keyHash) % maxParallelism" on a sample of your data?
> This also depends on the hashCode on the output of your KeySelector.
>
> keyBy should handle high traffic well, but it is designed for key spaces
> with thousands or millions of values. If this is not the case, you need to
> introduce some more artifical key to spread the load more evenly.
>
> Regarding your OutOfMemoryError: I think you producing elements much faster
> than the following operators after keyBy process/discard the elements. Can
> you explain us your job in more detail? Are you using event-time? How do you
> aggregate elements of the windows?
>
> Regards,
> Timo
>
>
>
> Am 1/1/18 um 6:00 AM schrieb Jinhua Luo:
>
>> I checked the logs, but no information indicates what happens.
>>
>> In fact, in the same app, there is another stream, but its kafka
>> source is low traffic, and I aggregate some field of that source too,
>> and flink gives correct results continuously.
>> So I doubt if keyby() could not handle high traffic well (which
>> affects the number of keys in the key partitions).
>>
>> 2018-01-01 2:04 GMT+08:00 Steven Wu :

   but soon later, no results produced, and flink seems busy doing
 something
 forever.
>>>
>>> Jinhua, don't know if you have checked these things. if not, maybe worth
>>> a
>>> look.
>>>
>>> have you tried to do a thread dump?
>>> How is the GC pause?
>>> do you see flink restart? check the exception tab in Flink web UI for
>>> your
>>> job.
>>>
>>>
>>>
>>> On Sun, Dec 31, 2017 at 6:20 AM, Jinhua Luo  wrote:

 I take time to read some source codes about the keyed stream
 windowing, and I make below understanding:

 a) the keyed stream would be split and dispatched to downstream tasks
 in hash manner, and the hash base is the parallelism of the downstream
 operator:

 See

 org.apache.flink.runtime.state.KeyGroupRangeAssignment.computeKeyGroupForKeyHash(int,
 int):
 MathUtils.murmurHash(keyHash) % maxParallelism;

 That's what the doc said "hash partitioning".

 So the compiled execution graph already determines whose operator
 instance receive which key groups.

 b) with windowing, the key is used to index window states, so the
 window function would receive the deserialized value from its
 corresponding window state of some key.

 b.1) The element would be added into the state first:

 See

 org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(StreamRecord):
 windowState.add(element.getValue());

 b.2) when the trigger fires the window, the value would be
 deserialized from the keyed state:

 ACC contents = windowState.get();
 emitWindowContents(actualWindow, contents);

 For rocksdb backend, each input element would be taken back and forth
 from the disk in the processing.

 flink's keyed stream has the same functionality as storm's field
 grouping, and more complicated.

 Am I correct?


 But I still could not understand why keyby() stops flink from
 returning expected results.

 Let me explain my case more:
 I use kafka data source, which collects log lines of log files from
 tens of machines.
 The log line is in json format, which contains the "ip" field, the ip
 address of the user, so it could be valued in million of ip addresses
 of the Internet.
 The stream processing is expected to result in ip aggregation in {1
 hour, 1 min} sliding window.

 If I use keyBy("ip"), then at first minutes, the flink could give me
 correct aggregation results, but soon later, no results produced, and
 flink seems busy doing something forever.

 I doubt if keyby() could handle huge keys like this case, and when I
 remove keyby().window().fold() and use windowAll().fold() instead (the
 latter fold operator uses hashmap to aggregate ip by itself), flink
 works. But as known, the windowAll() is not scale-able.

 Could flink developers help me on this topic, I prefer flink and I
 believe flink is one of best stream processing frameworks, but I am
 really frustrated that flink could be fulfill its feature just like
 the doc said.

 Thank you all.


 2017-12-29 17:42 GMT+08:00 Jinhua Luo :
>
> I misuse the key selector. I checked 

Re: Apache Flink - broadcasting DataStream

2018-01-04 Thread Fabian Hueske
Hi,

A broadcast replicates all elements by the parallelism of the following
operator, i.e., each parallel instance of the following operator receives
all events of its input stream.
If the operation following a broadcast is a connect and a co-operator (and
the other input is not broadcasted), the operator can be used for arbitrary
joins (not only equi-joins).
Depending on the join semantics, the joining operator needs to put events
from one (broadcasted or non-broadcasted) or both inputs as state to be
able to perform the join.

Best, Fabian


2017-12-30 23:32 GMT+01:00 M Singh :

> Hi Ufuk:
>
> Thanks for your explanation.
>
> I can understand broadcasting a small immutable dataset to the subtasks so
> that they can be joined with a stream.
>
> However I am still trying to understand how will each broadcasted element
> from a stream be used in join operation with another stream.  Is this just
> on optimization over joining two streams ?
>
> Also, I believe that substasks are operating on partitions of a stream and
> only equi-joins are possible for streams.  So what is the reason we would
> like to broadcast each element to all the substasks ?
>
> Thanks again.
>
>
> On Wednesday, December 27, 2017 12:52 AM, Ufuk Celebi 
> wrote:
>
>
> Hey Mans!
>
> This refers to how sub tasks are connected to each other in your
> program. If you have a single sub task A1 and three sub tasks B1, B2,
> B3, broadcast will emit each incoming record at A1 to all B1, B2, B3:
>
> A1 --+-> B1
> +-> B2
> +-> B3
>
> Does this help?
>
> On Mon, Dec 25, 2017 at 7:12 PM, M Singh  wrote:
> > 1 What elements get broad to the partitions ?
>
> Each incoming element is broadcasted
>
> > 2. What happens as new elements are added to the stream ? Are only the
> new
> > elements broadcast ?
>
> Yes, each incoming element is broadcasted separately without any history.
>
> > 3. Since the broadcast operation returns a DataStream can it be used in
> join
> > how do new (and old) elements affect the join results ?
>
> Yes, should be. Every element is broadcasted only once.
>
>
> > 4. Similarly how does broadcast work with connected streams ?
>
>
> Similar to non connected streams. The incoming records are emitted to
> every downstream partition.
>
> – Ufuk
>
>
>