ElasticSearchSink - retrying doesn't work in ActionRequestFailureHandler

2019-02-08 Thread Averell
Hello,

I am trying to follow this Flink guide [1] to handle errors in
ElasticSearchSink by re-adding the failed messages to the queue.
The error scenarios that I got and going to retry are: (i) conflict in
UpdateRequest document version and (ii) lost connection to ElasticSearch.
These errors are expected to be non-persistent, would be solved by (i)
changing the version / (ii) gone after some seconds
What I expect is message got retried successfully.
What I actually got was: Flink seemed to get stuck on that (first) retry, my
flow queued up (backpressure is 1 everywhere), all processing hung.

Here is my error handling code:


private object MyElasticSearchFailureHandler extends
ActionRequestFailureHandler {
override def onFailure(actionRequest: ActionRequest, failure: 
Throwable,
restStatusCode: Int, indexer: RequestIndexer): Unit = {
if (ExceptionUtils.findThrowableWithMessage(failure,
"version_conflict_engine_exception") != Optional.empty()) {
actionRequest match {
case s: UpdateRequest =>
LOG.warn(s"Failed inserting 
record to ElasticSearch due to version
conflict (${s.version()}). Retrying")
LOG.warn(actionRequest.toString)

indexer.add(s.version(s.version() + 1))
case _ =>
LOG.error("Failed inserting 
record to ElasticSearch due to version
conflict. However, this is not an Update-Request. Don't know why.")

LOG.error(actionRequest.toString)
throw failure
}
} else if (restStatusCode == -1 &&
failure.getMessage.contains("Connection closed")) {
LOG.warn(s"Retrying record: 
${actionRequest.toString}")
actionRequest match {
case s: UpdateRequest => indexer.add(s)
case s: IndexRequest => indexer.add(s)
}
} else {
LOG.error(s"ELASTICSEARCH FAILED:\n
statusCode $restStatusCode\n   
message: ${failure.getMessage}\n${failure.getStackTrace}")
LOG.error(s"DATA:\n
${actionRequest.toString}")
throw failure
}
}
}


Here is the extract from my task-manager logs:

/2019-02-09 04:12:35.676 [I/O dispatcher 25] ERROR
o.a.f.s.connectors.elasticsearch.ElasticsearchSinkBase  - Failed
Elasticsearch bulk request: Connection closed
2019-02-09 04:12:35.678 [I/O dispatcher 25] WARN 
c.n.c..sink.MyElasticSearchSink$  - Retrying record: update
{[idx-20190208][_doc][doc_id_154962270], doc_as_upsert[true], doc[index
{*[null][null][null]*, source[{...}]}], scripted_upsert[false],
detect_noop[true]}
2019-02-09 04:12:54.242 [Sink: S3 - Historical (1/4)] INFO 
o.a.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0
checkpointing for checkpoint with id=24 (max part counter=26)./

And job-manager logs:
/2019-02-09 03:59:37.880 [flink-akka.actor.default-dispatcher-4] INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Completed
checkpoint 23 for job 1a1438ca23387c4ef9a59ff9da6dafa1 (430392865 bytes in
307078 ms).
2019-02-09 04:09:30.970 [Checkpoint Timer] INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering
checkpoint 24 @ 1549685370776 for job 1a1438ca23387c4ef9a59ff9da6dafa1.
2019-02-09 04:17:00.970 [Checkpoint Timer] INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Checkpoint 24
of job 1a1438ca23387c4ef9a59ff9da6dafa1 expired before completing.
2019-02-09 04:24:31.035 [Checkpoint Timer] INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering
checkpoint 25 @ 1549686270776 for job 1a1438ca23387c4ef9a59ff9da6dafa1.
2019-02-09 04:32:01.035 [Checkpoint Timer] INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Checkpoint 25
of job 1a1438ca23387c4ef9a59ff9da6dafa1 expired before completing.
2019-02-09 04:39:30.961 [Checkpoint Timer] INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering
checkpoint 26 @ 1549687170776 for job 1a1438ca23387c4ef9a59ff9da6dafa1./

Thanks and best regards,
Averell

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/elasticsearch.html#handling-failing-elasticsearch-requests
<https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/elasticsearch.html#handling-failing-elasticsearch-requests>
  



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Get nested Rows from Json string

2019-02-08 Thread Rong Rong
Hi François,

I just did some research and seems like this is in fact a Stringify issue.
If you try running one of the AvroRowDeSerializationSchemaTest [1],
you will find out that only MAP, ARRAY are correctly stringify (Map using
"{}" quote and Array using "[]" quote).
However nested records are not quoted using "()".

Wasn't sure if this is consider as a bug for the toString method of the
type Row. I just filed a JIRA [2] for this issue, feel free to comment on
the discussion.

--
Rong

[1]
https://github.com/apache/flink/blob/release-1.7/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDeSerializationSchemaTest.java
[2] https://issues.apache.org/jira/browse/FLINK-11569

On Fri, Feb 8, 2019 at 8:51 AM françois lacombe <
francois.laco...@dcbrain.com> wrote:

> Hi Rong,
>
> Thank you for this answer.
> I've changed Rows to Map, which ease the conversion process.
>
> Nevertheless I'm interested in any explanation about why row1.setField(i,
> row2) appeends row2 at the end of row1.
>
> All the best
>
> François
>
> Le mer. 6 févr. 2019 à 19:33, Rong Rong  a écrit :
>
>> Hi François,
>>
>> I wasn't exactly sure this is a JSON object or JSON string you are trying
>> to process.
>> For a JSON string this [1] article might help.
>> For a JSON object, I am assuming you are trying to convert it into a
>> TableSource and processing using Table/SQL API, you could probably use the
>> example here [2]
>>
>> BTW, a very remote hunch, this might be just a stringify issue how you
>> print the row out.
>>
>> --
>> Rong
>>
>> [1]:
>> https://stackoverflow.com/questions/49380778/how-to-stream-a-json-using-flink
>> [2]:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/table/sourceSinks.html#table-sources-sinks
>>
>> On Wed, Feb 6, 2019 at 3:06 AM françois lacombe <
>> francois.laco...@dcbrain.com> wrote:
>>
>>> Hi all,
>>>
>>> I currently get a json string from my pgsql source with nested objects
>>> to be converted into Flink's Row.
>>> Nested json objects should go in nested Rows.
>>> An avro schema rules the structure my source should conform to.
>>>
>>> According to this json :
>>> {
>>>   "a":"b",
>>>   "c":"d",
>>>   "e":{
>>>"f":"g"
>>>}
>>> }
>>>
>>> ("b", "d", Row("g")) is expected as a result according to my avro schema.
>>>
>>> I wrote a recursive method which iterate over json objects and put
>>> nested Rows at right indices in their parent but here is what outputs :
>>> ("b", "d", "g")
>>> Child Row is appended to the parent. I don't understand why.
>>> Obviously, process is crashing arguing the top level Row arity doesn't
>>> match serializers.
>>>
>>> Is there some native methods in Flink to achieve that?
>>> I don't feel so comfortable to have written my own json processor for
>>> this job.
>>>
>>> Do you have any hint which can help please ?
>>>
>>> All the best
>>>
>>> François
>>>
>>>
>>>
>>>    
>>> 
>>> 
>>>
>>> [image: Arbre vert.jpg] Pensez à la planète, imprimer ce papier que si
>>> nécessaire
>>>
>>
>
>    
> 
> 
>
> [image: Arbre vert.jpg] Pensez à la planète, imprimer ce papier que si
> nécessaire
>


Re: Running JobManager as Deployment instead of Job

2019-02-08 Thread Vishal Santoshi
In one case however, we do want to retain the same cluster id ( think
ingress on k8s  and thus SLAs with external touch points ) but it is
essentially a new job ( added an incompatible change but at the interface
level it retains the same contract ) , the only way seems to be to remove
the chroot/subcontext from ZK , and relaunch , essentially deleting ant
vestiges of the previous incarnation. And that is fine if that is indeed
the process.


On Fri, Feb 8, 2019 at 7:58 AM Till Rohrmann  wrote:

> If you keep the same cluster id, the upgraded job should pick up
> checkpoints from the completed checkpoint store. However, I would recommend
> to take a savepoint and resume from this savepoint because then you can
> also specify that you allow non restored state, for example.
>
> Cheers,
> Till
>
> On Fri, Feb 8, 2019 at 11:20 AM Vishal Santoshi 
> wrote:
>
>> Is the rationale of using a jobID 00* also roughly the same. As in a
>> Flink job cluster is a single job and thus a single job id suffices ?  I am
>> more wondering about the case when we are doing a compatible changes to a
>> job and want to resume ( given we are in HA mode and thus have a
>> chroot/subcontext on ZK for the job cluster ) ,  it would make no sense to
>> give a brand new job id ?
>>
>> On Thu, Feb 7, 2019 at 4:42 AM Till Rohrmann 
>> wrote:
>>
>>> Hi Sergey,
>>>
>>> the rationale why we are using a K8s job instead of a deployment is that
>>> a Flink job cluster should terminate after it has successfully executed the
>>> Flink job. This is unlike a session cluster which should run forever and
>>> for which a K8s deployment would be better suited.
>>>
>>> If in your use case a K8s deployment would better work, then I would
>>> suggest to change the `job-cluster-job.yaml` accordingly.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Feb 5, 2019 at 4:12 PM Sergey Belikov 
>>> wrote:
>>>
 Hi,

 my team is currently experimenting with Flink running in Kubernetes
 (job cluster setup). And we found out that with JobManager being deployed
 as "Job" we can't just simply update certain values in job's yaml, e.g.
 spec.template.spec.containers.image (
 https://github.com/kubernetes/kubernetes/issues/48388#issuecomment-319493817).
 This causes certain troubles in our CI/CD pipelines so we are thinking
 about using "Deployment" instead of "Job".

 With that being said I'm wondering what was the motivation behind using
 "Job" resource for deploying JobManager? And are there any pitfalls related
 to using Deployment and not Job for JobManager?

 Thank you in advance.
 --
 Best regards,
 Sergey Belikov

>>>


Per-workflow configurations for an S3-related property

2019-02-08 Thread Ken Krugler
Hi all,

When running in EMR, we’re encountering the oh-so-common HTTP timeout that’s 
caused by the connection pool being too small (see below)

I’d found one SO answer  that said 
to bump fs.s3.maxConnections for the EMR S3 filesystem implementation.

I tried to change this for my failing job, via adding to the CLI command line:

-yD fs.s3.maxConnections=2000

But it didn’t seem to have any impact.

I haven’t dug into the Flink CLI code to figure out if -yD can actually be used 
to alter values like this (which come from emrfs-site.xml)

What’s the right way (on a per-job basis) to tweak something like this?

Thanks!

— Ken

==
2019-02-08 21:18:29,669 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Job Flink Java 
Job at Fri Feb 08 21:16:58 UTC 2019 (c361522b13121e263460364b1e38db9d) switched 
from state RUNNING to FAILING.
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.SdkClientException: Unable to 
execute HTTP request: Timeout waiting for connection from pool
at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1134)
at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1080)
at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:745)
at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:719)
at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:701)
at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:669)
at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:651)
at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:515)
at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4443)
at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4390)
at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1280)
at 
com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:22)
at 
com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:8)
at 
com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Executor.execute(GlobalS3Executor.java:91)
at 
com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.invoke(AmazonS3LiteClient.java:184)
at 
com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.getObjectMetadata(AmazonS3LiteClient.java:96)
at 
com.amazon.ws.emr.hadoop.fs.s3.lite.AbstractAmazonS3Lite.getObjectMetadata(AbstractAmazonS3Lite.java:38)
at 
com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:214)
at 
com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:758)
at 
com.amazon.ws.emr.hadoop.fs.EmrFileSystem.getFileStatus(EmrFileSystem.java:548)
at 
org.apache.hadoop.io.SequenceFile$Reader.(SequenceFile.java:1842)
at 
org.apache.hadoop.io.SequenceFile$Reader.(SequenceFile.java:1865)
at 
org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader.initialize(SequenceFileRecordReader.java:54)
at 
org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase.open(HadoopInputFormatBase.java:187)
at 
org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase.open(HadoopInputFormatBase.java:59)
at 
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:170)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
Caused by: 
com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.conn.ConnectionPoolTimeoutException:
 Timeout waiting for connection from pool
at 
com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager.leaseConnection(PoolingHttpClientConnectionManager.java:286)
at 
com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager$1.get(PoolingHttpClientConnectionManager.java:263)
at sun.reflect.GeneratedMethodAccessor68.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at 

Re: long lived standalone job session cluster in kubernetes

2019-02-08 Thread Heath Albritton
Has any progress been made on this?  There are a number of folks in
the community looking to help out.


-H

On Wed, Dec 5, 2018 at 10:00 AM Till Rohrmann  wrote:
>
> Hi Derek,
>
> there is this issue [1] which tracks the active Kubernetes integration. Jin 
> Sun already started implementing some parts of it. There should also be some 
> PRs open for it. Please check them out.
>
> [1] https://issues.apache.org/jira/browse/FLINK-9953
>
> Cheers,
> Till
>
> On Wed, Dec 5, 2018 at 6:39 PM Derek VerLee  wrote:
>>
>> Sounds good.
>>
>> Is someone working on this automation today?
>>
>> If not, although my time is tight, I may be able to work on a PR for getting 
>> us started down the path Kubernetes native cluster mode.
>>
>>
>> On 12/4/18 5:35 AM, Till Rohrmann wrote:
>>
>> Hi Derek,
>>
>> what I would recommend to use is to trigger the cancel with savepoint 
>> command [1]. This will create a savepoint and terminate the job execution. 
>> Next you simply need to respawn the job cluster which you provide with the 
>> savepoint to resume from.
>>
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#cancel-job-with-savepoint
>>
>> Cheers,
>> Till
>>
>> On Tue, Dec 4, 2018 at 10:30 AM Andrey Zagrebin  
>> wrote:
>>>
>>> Hi Derek,
>>>
>>> I think your automation steps look good.
>>> Recreating deployments should not take long
>>> and as you mention, this way you can avoid unpredictable old/new version 
>>> collisions.
>>>
>>> Best,
>>> Andrey
>>>
>>> > On 4 Dec 2018, at 10:22, Dawid Wysakowicz  wrote:
>>> >
>>> > Hi Derek,
>>> >
>>> > I am not an expert in kubernetes, so I will cc Till, who should be able
>>> > to help you more.
>>> >
>>> > As for the automation for similar process I would recommend having a
>>> > look at dA platform[1] which is built on top of kubernetes.
>>> >
>>> > Best,
>>> >
>>> > Dawid
>>> >
>>> > [1] https://data-artisans.com/platform-overview
>>> >
>>> > On 30/11/2018 02:10, Derek VerLee wrote:
>>> >>
>>> >> I'm looking at the job cluster mode, it looks great and I and
>>> >> considering migrating our jobs off our "legacy" session cluster and
>>> >> into Kubernetes.
>>> >>
>>> >> I do need to ask some questions because I haven't found a lot of
>>> >> details in the documentation about how it works yet, and I gave up
>>> >> following the the DI around in the code after a while.
>>> >>
>>> >> Let's say I have a deployment for the job "leader" in HA with ZK, and
>>> >> another deployment for the taskmanagers.
>>> >>
>>> >> I want to upgrade the code or configuration and start from a
>>> >> savepoint, in an automated way.
>>> >>
>>> >> Best I can figure, I can not just update the deployment resources in
>>> >> kubernetes and allow the containers to restart in an arbitrary order.
>>> >>
>>> >> Instead, I expect sequencing is important, something along the lines
>>> >> of this:
>>> >>
>>> >> 1. issue savepoint command on leader
>>> >> 2. wait for savepoint
>>> >> 3. destroy all leader and taskmanager containers
>>> >> 4. deploy new leader, with savepoint url
>>> >> 5. deploy new taskmanagers
>>> >>
>>> >>
>>> >> For example, I imagine old taskmanagers (with an old version of my
>>> >> job) attaching to the new leader and causing a problem.
>>> >>
>>> >> Does that sound right, or am I overthinking it?
>>> >>
>>> >> If not, has anyone tried implementing any automation for this yet?
>>> >>
>>> >
>>>


Can an Aggregate the key from a WindowedStream.aggregate()

2019-02-08 Thread stephen . alan . connolly
If I write my aggregation logic as a WindowFunction then I get access to the 
key as the first parameter in WindowFunction.apply(...) however the Javadocs 
for calling WindowedStream.apply(WindowFunction) state:

> Note that this function requires that all data in the windows is buffered 
> until the window
> is evaluated, as the function provides no means of incremental aggregation.

Which sounds bad. 

It seems the recommended alternative is to use one of the 
WindowFunction.aggregate(AggregateFunction) however I cannot see how to get 
access to the key...

Is my only solution to transform my data into a Tuple if I need access to the 
key post aggregation?

Thanks in advance

-stephenc


[Table] Types of query result and tablesink do not match error

2019-02-08 Thread françois lacombe
Hi all,

An error is currently raised when using table.insertInto("registeredSink")
in Flink 1.7.0 when types of table and sink don't match.

I've got the following :
org.apache.flink.table.api.ValidationException: Field types of query result
and registered TableSink null do not match.
Query result schema: [dynamicFields: Map, staticFields: Map]
TableSink schema:[dynamicFields: Map, staticFields: Map]
at
org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:876)
at org.apache.flink.table.api.Table.insertInto(table.scala:918)

Schemas are the same
All fields got the GenericType type and I don't understand
why they are so different.

Have you any additional way to get extra debug information ?
Any hint ?

All the best

François

-- 

       
   



 Pensez à la 
planète, imprimer ce papier que si nécessaire 


Re: Get nested Rows from Json string

2019-02-08 Thread françois lacombe
Hi Rong,

Thank you for this answer.
I've changed Rows to Map, which ease the conversion process.

Nevertheless I'm interested in any explanation about why row1.setField(i,
row2) appeends row2 at the end of row1.

All the best

François

Le mer. 6 févr. 2019 à 19:33, Rong Rong  a écrit :

> Hi François,
>
> I wasn't exactly sure this is a JSON object or JSON string you are trying
> to process.
> For a JSON string this [1] article might help.
> For a JSON object, I am assuming you are trying to convert it into a
> TableSource and processing using Table/SQL API, you could probably use the
> example here [2]
>
> BTW, a very remote hunch, this might be just a stringify issue how you
> print the row out.
>
> --
> Rong
>
> [1]:
> https://stackoverflow.com/questions/49380778/how-to-stream-a-json-using-flink
> [2]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/table/sourceSinks.html#table-sources-sinks
>
> On Wed, Feb 6, 2019 at 3:06 AM françois lacombe <
> francois.laco...@dcbrain.com> wrote:
>
>> Hi all,
>>
>> I currently get a json string from my pgsql source with nested objects to
>> be converted into Flink's Row.
>> Nested json objects should go in nested Rows.
>> An avro schema rules the structure my source should conform to.
>>
>> According to this json :
>> {
>>   "a":"b",
>>   "c":"d",
>>   "e":{
>>"f":"g"
>>}
>> }
>>
>> ("b", "d", Row("g")) is expected as a result according to my avro schema.
>>
>> I wrote a recursive method which iterate over json objects and put nested
>> Rows at right indices in their parent but here is what outputs : ("b", "d",
>> "g")
>> Child Row is appended to the parent. I don't understand why.
>> Obviously, process is crashing arguing the top level Row arity doesn't
>> match serializers.
>>
>> Is there some native methods in Flink to achieve that?
>> I don't feel so comfortable to have written my own json processor for
>> this job.
>>
>> Do you have any hint which can help please ?
>>
>> All the best
>>
>> François
>>
>>
>>
>>    
>> 
>> 
>>
>> [image: Arbre vert.jpg] Pensez à la planète, imprimer ce papier que si
>> nécessaire
>>
>

-- 

       
   



 Pensez à la 
planète, imprimer ce papier que si nécessaire 


Re: Broadcast state before events stream consumption

2019-02-08 Thread Chirag Dewan
 Hi Vadim,
I would be interested in this too. 
Presently, I have to read my lookup source in the open method and keep it in a 
cache. By doing that I cannot make use of the broadcast state until ofcourse 
the first emit comes on the Broadcast stream.
The problem with waiting the event stream is the lack of knowledge that I have 
read all the data from the lookup source. There is no possibility of having a 
special marker in the data as well for my use case.
So pre loading the data seems to be the only option right now.
Thanks,
Chirag


On Friday, 8 February, 2019, 7:45:37 pm IST, Vadim Vararu 
 wrote:  
 
  Hi all,
I need to use the broadcast state mechanism 
(https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html)
 for the next scenario.
I have a reference data stream (slow) and an events stream (fast running) and I 
want to do a kind of lookup in the reference stream for eachevent. The 
broadcast state mechanism seems to fit perfect the scenario. 
>From documentation:As an example where broadcast state can emerge as a natural 
>fit, one can imagine a low-throughput stream containing a set of rules which 
>we want to evaluate against all elements coming from another stream.

However, I am not sure what is the correct way to delay the consumption of the 
fast running stream until the slow one is fully read (in case of a file) or 
until a marker is emitted (in case of some other source). Is there any way to 
accomplish that? It doesn't seem to be a rare use case.
Thanks, Vadim.  

Help with a stream processing use case

2019-02-08 Thread Sandybayev, Turar (CAI - Atlanta)
Hi all,

I wonder whether it’s possible to use Flink for the following requirement. We 
need to process a Kinesis stream and based on values in each record, route 
those records to different S3 buckets and keyspaces, with support for batching 
up of files and control over partitioning scheme (so preferably through 
Firehose).

I know it’s straightforward to have a Kinesis source and a Kinesis sink, and 
the hook up Firehose to the sink from AWS, but I need a “fan out” to 
potentially thousands of different buckets, based on content of each event.

Thanks!
Turar




Re: stream of large objects

2019-02-08 Thread Aggarwal, Ajay
Yes, another KeyBy will be used. The “small size” messages will be strings of 
length 500 to 1000.

Is there a concept of “global” state in flink? Is it possible to keep these 
lists in global state and only pass the list reference (by name?) in the 
LargeMessage?


From: Chesnay Schepler 
Date: Friday, February 8, 2019 at 8:45 AM
To: "Aggarwal, Ajay" , "user@flink.apache.org" 

Subject: Re: stream of large objects

Whether a LargeMessage is serialized depends on how the job is structured.
For example, if you were to only apply map/filter functions after the 
aggregation it is likely they wouldn't be serialized.
If you were to apply another keyBy they will be serialized again.

When you say "small size" messages, what are we talking about here?

On 07.02.2019 20:37, Aggarwal, Ajay wrote:
In my use case my source stream contain small size messages, but as part of 
flink processing I will be aggregating them into large messages and further 
processing will happen on these large messages. The structure of this large 
message will be something like this:

   Class LargeMessage {
String key
   List  messages; // this is where the aggregation of smaller 
messages happen
   }

In some cases this list field of LargeMessage can get very large (1000’s of 
messages). Is it ok to create an intermediate stream of these LargeMessages? 
What should I be concerned about while designing the flink job? Specifically 
with parallelism in mind. As these LargeMessages flow from one flink subtask to 
another, do they get serialized/deserialized ?

Thanks.





Re: Running single Flink job in a job cluster, problem starting JobManager

2019-02-08 Thread Thomas Eckestad
Hi again,

when removing Spring Boot from the application it works.

I would really like to mix Spring Boot and Flink. It does work with Spring Boot 
when submitting jobs to a session cluster, as stated before.

/Thomas

From: Thomas Eckestad 
Sent: Friday, February 8, 2019 12:14 PM
To: user@flink.apache.org
Subject: Running single Flink job in a job cluster, problem starting JobManager

Hi,

I am trying to run a flink job cluster in K8s. As a first step I have created a 
Docker image according to:

https://github.com/apache/flink/blob/release-1.7/flink-container/docker/README.md

When I try to run the image:

docker run --name=flink-job-manager flink-image:latest job-cluster 
--job-classname com.foo.bar.FlinkTest 
-Djobmanager.rpc.address=flink-job-cluster -Dparallelism.default=1 
-Dblob.server.port=6124 -Dqueryable-state.server.ports=6125

the execution fails with the following exception:

org.springframework.beans.factory.BeanCreationException: Error creating bean 
with name 'MyFlinkJob': Invocation of init method failed; nested exception is 
org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
at 
org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeInitialization(InitDestroyAnnotationBeanPostProcessor.java:139)
at 
org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsBeforeInitialization(AbstractAutowireCapableBeanFactory.java:419)
at 
org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1737)
at 
org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:576)
at 
org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:498)
at 
org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:320)
at 
org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:222)
at 
org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:318)
at 
org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:199)
at 
org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:846)
at 
org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:863)
at 
org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:546)
at 
org.springframework.boot.SpringApplication.refresh(SpringApplication.java:775)
at 
org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:316)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1260)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1248)
at com.foo.bar.FlinkTest.main(FlinkTest.java:10)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.springframework.boot.devtools.restart.RestartLauncher.run(RestartLauncher.java:49)
Caused by: 
org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
at 
org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:70)
at com.foo.bar.FlinkJob.MyFlinkJob.init(MyFlinkJob.java:59)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleElement.invoke(InitDestroyAnnotationBeanPostProcessor.java:363)
at 
org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleMetadata.invokeInitMethods(InitDestroyAnnotationBeanPostProcessor.java:307)
at 
org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeInitialization(InitDestroyAnnotationBeanPostProcessor.java:136)
... 22 more

I can successfully run the same job.jar on a session cluster 
(start-cluster.sh;flink run job.jar). Any ideas? Feels like I am missing 
something obvious?

At MyFlinkJob.java:59 I do: streamExecutionEnvironment.execute("MyFlinkJob");

It feels strange that the execution ends up in 

Broadcast state before events stream consumption

2019-02-08 Thread Vadim Vararu
Hi all,

I need to use the broadcast state mechanism 
(https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html)
 for the next scenario.

I have a reference data stream (slow) and an events stream (fast running) and I 
want to do a kind of lookup in the reference stream for each
event. The broadcast state mechanism seems to fit perfect the scenario.

>From documentation:
As an example where broadcast state can emerge as a natural fit, one can 
imagine a low-throughput stream containing a set of rules which we want to 
evaluate against all elements coming from another stream.

However, I am not sure what is the correct way to delay the consumption of the 
fast running stream until the slow one is fully read (in case of a file) or 
until a marker is emitted (in case of some other source). Is there any way to 
accomplish that? It doesn't seem to be a rare use case.

Thanks, Vadim.


Flink Standalone cluster - logging problem

2019-02-08 Thread simpleusr
We are using standalone cluster and submittig jobs through command line
client.

As stated in
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/logging.html
, we are editing log4j-cli.properties but this does not make any effect?  

Anybody seen that before?

Regards




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Flink Standalone cluster - production settings

2019-02-08 Thread simpleusr
I know this seems a silly question but I am trying to figure out optimal set
up for our flink jobs. 
We are using standalone cluster with 5 jobs. Each job has 3 asynch operators
with Executors with thread counts of 20,20,100. Source is kafka and
cassandra and rest sinks exist.
Currently we are using parallelism = 1.  So at max load a single job spans
at least 140 threads. Also we are using netty based libraries for cassandra
and restcalls . (As I can see in thread dump flink also uses netty server).
 What we see is that total thread count adds up to ~ 500 for a single job.

Suddenly all jobs began to faıl ın production and we saw that it was mainly
due to ulimit user process. All jobs started in one server in cluster ( I do
not know why, as it is a cluster with 3 members)
It was set to around 1500 in that server. We then set a higher value and
problems seem to go away.

Can you recommend an optional prod setting for standalone cluster? Or should
there be a max limit on threads spawned by a single job?

Regards




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Flink Standalone cluster - dumps

2019-02-08 Thread simpleusr
Flink Standalone cluster - dumps

We are using standalone cluster and submittig jobs through command line
client.

As far as I understand, the job is executed in task manager. A single task
manager represents a single jvm? So the dump shows threads from all jobs
bound to task manager.
Two questions:

1) Is there a special setting so that each task manager is occupied by a
single job. Can setting slot size to 1 be a workaround?
2) If option above is not possible is there any way to get dump per job?


Regards



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: stream of large objects

2019-02-08 Thread Chesnay Schepler

Whether a LargeMessage is serialized depends on how the job is structured.
For example, if you were to only apply map/filter functions after the 
aggregation it is likely they wouldn't be serialized.

If you were to apply another keyBy they will be serialized again.

When you say "small size" messages, what are we talking about here?

On 07.02.2019 20:37, Aggarwal, Ajay wrote:


In my use case my source stream contain small size messages, but as 
part of flink processing I will be aggregating them into large 
messages and further processing will happen on these large messages. 
The structure of this large message will be something like this:


   Class LargeMessage {

  String key

   List  messages; // this is where the aggregation of 
smaller messages happen


   }

In some cases this list field of LargeMessage can get very large 
(1000’s of messages). Is it ok to create an intermediate stream of 
these LargeMessages? What should I be concerned about while designing 
the flink job? Specifically with parallelism in mind. As these 
LargeMessages flow from one flink subtask to another, do they get 
serialized/deserialized ?


Thanks.





Re: Running JobManager as Deployment instead of Job

2019-02-08 Thread Till Rohrmann
If you keep the same cluster id, the upgraded job should pick up
checkpoints from the completed checkpoint store. However, I would recommend
to take a savepoint and resume from this savepoint because then you can
also specify that you allow non restored state, for example.

Cheers,
Till

On Fri, Feb 8, 2019 at 11:20 AM Vishal Santoshi 
wrote:

> Is the rationale of using a jobID 00* also roughly the same. As in a
> Flink job cluster is a single job and thus a single job id suffices ?  I am
> more wondering about the case when we are doing a compatible changes to a
> job and want to resume ( given we are in HA mode and thus have a
> chroot/subcontext on ZK for the job cluster ) ,  it would make no sense to
> give a brand new job id ?
>
> On Thu, Feb 7, 2019 at 4:42 AM Till Rohrmann  wrote:
>
>> Hi Sergey,
>>
>> the rationale why we are using a K8s job instead of a deployment is that
>> a Flink job cluster should terminate after it has successfully executed the
>> Flink job. This is unlike a session cluster which should run forever and
>> for which a K8s deployment would be better suited.
>>
>> If in your use case a K8s deployment would better work, then I would
>> suggest to change the `job-cluster-job.yaml` accordingly.
>>
>> Cheers,
>> Till
>>
>> On Tue, Feb 5, 2019 at 4:12 PM Sergey Belikov 
>> wrote:
>>
>>> Hi,
>>>
>>> my team is currently experimenting with Flink running in Kubernetes (job
>>> cluster setup). And we found out that with JobManager being deployed as
>>> "Job" we can't just simply update certain values in job's yaml, e.g.
>>> spec.template.spec.containers.image (
>>> https://github.com/kubernetes/kubernetes/issues/48388#issuecomment-319493817).
>>> This causes certain troubles in our CI/CD pipelines so we are thinking
>>> about using "Deployment" instead of "Job".
>>>
>>> With that being said I'm wondering what was the motivation behind using
>>> "Job" resource for deploying JobManager? And are there any pitfalls related
>>> to using Deployment and not Job for JobManager?
>>>
>>> Thank you in advance.
>>> --
>>> Best regards,
>>> Sergey Belikov
>>>
>>


Reduce one event under multiple keys

2019-02-08 Thread Stephen Connolly
Ok, I'll try and map my problem into something that should be familiar to
most people.

Consider collection of PCs, each of which has a unique ID, e.g.
ca:fe:ba:be, de:ad:be:ef, etc.

Each PC has a tree of local files. Some of the file paths are
coincidentally the same names, but there is no file sharing between PCs.

I need to produce metrics about how often files are opened and how long
they are open for.

I need for every X minute tumbling window not just the cumulative averages
for each PC, but the averages for each file as well as the cumulative
averegaes for each folder and their sub-folders.

I have a stream of events like

{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/README.txt","duration":"67"}{"source":"de:ad:be:ef","action":"open","path":"/foo/manchu/README.txt"}
{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/User
guide.txt"}{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/Admin
guide.txt"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/User
guide.txt","duration":"97"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/Admin
guide.txt","duration":"196"}
{"source":"ca:fe:ba:be","action":"open","path":"/foo/manchu/README.txt"}
{"source":"de:ad:be:ef","action":"open","path":"/bar/foo/README.txt"}

So from that I would like to know stuff like:

ca:fe:ba:be had 4/X opens per minute in the X minute window
ca:fe:ba:be had 3/X closes per minute in the X minute window and the
average time open was (67+97+197)/3=120... there is no guarantee that the
closes will be matched with opens in the same window, which is why I'm only
tracking them separately
de:ad:be:ef had 2/X opens per minute in the X minute window
ca:fe:ba:be /foo had 4/X opens per minute in the X minute window
ca:fe:ba:be /foo had 3/X closes per minute in the X minute window and the
average time open was 120
de:ad:be:ef /foo had 1/X opens per minute in the X minute window
de:ad:be:ef /bar had 1/X opens per minute in the X minute window
de:ad:be:ef /foo/manchu had 1/X opens per minute in the X minute window
de:ad:be:ef /bar/foo had 1/X opens per minute in the X minute window
de:ad:be:ef /foo/manchu/README.txt had 1/X opens per minute in the X minute
window
de:ad:be:ef /bar/foo/README.txt had 1/X opens per minute in the X minute
window
etc

What I think I want to do is turn each event into a series of events with
different keys, so that

{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"}

gets sent under the keys:

("ca:fe:ba:be","/")
("ca:fe:ba:be","/foo")
("ca:fe:ba:be","/foo/bar")
("ca:fe:ba:be","/foo/bar/README.txt")

Then I could use a window aggregation function to just:

* count the "open" events
* count the "close" events and sum their duration

Additionally, I am (naïevely) hoping that if a window has no events for a
particular key, the memory/storage costs are zero for that key.

>From what I can see, to achieve what I am trying to do, I could use a
flatMap followed by a keyBy

In other words I take the events and flat map them based on the path split
on '/' returning a Tuple of the (to be) key and the event. Then I can use
keyBy to key based on the Tuple 0.

My ask:

Is the above design a good design? How would you achieve the end game
better? Do I need to worry about many paths that are accessed rarely and
would have an accumulator function that stays at 0 unless there are events
in that window... or are the accumulators for each distinct key eagerly
purged after each fire trigger.

What gotcha's do I need to look for.

Thanks in advance and appologies for the length

-stephenc


Running single Flink job in a job cluster, problem starting JobManager

2019-02-08 Thread Thomas Eckestad
Hi,

I am trying to run a flink job cluster in K8s. As a first step I have created a 
Docker image according to:

https://github.com/apache/flink/blob/release-1.7/flink-container/docker/README.md

When I try to run the image:

docker run --name=flink-job-manager flink-image:latest job-cluster 
--job-classname com.foo.bar.FlinkTest 
-Djobmanager.rpc.address=flink-job-cluster -Dparallelism.default=1 
-Dblob.server.port=6124 -Dqueryable-state.server.ports=6125

the execution fails with the following exception:

org.springframework.beans.factory.BeanCreationException: Error creating bean 
with name 'MyFlinkJob': Invocation of init method failed; nested exception is 
org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
at 
org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeInitialization(InitDestroyAnnotationBeanPostProcessor.java:139)
at 
org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsBeforeInitialization(AbstractAutowireCapableBeanFactory.java:419)
at 
org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1737)
at 
org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:576)
at 
org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:498)
at 
org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:320)
at 
org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:222)
at 
org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:318)
at 
org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:199)
at 
org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:846)
at 
org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:863)
at 
org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:546)
at 
org.springframework.boot.SpringApplication.refresh(SpringApplication.java:775)
at 
org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:316)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1260)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1248)
at com.foo.bar.FlinkTest.main(FlinkTest.java:10)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.springframework.boot.devtools.restart.RestartLauncher.run(RestartLauncher.java:49)
Caused by: 
org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
at 
org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:70)
at com.foo.bar.FlinkJob.MyFlinkJob.init(MyFlinkJob.java:59)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleElement.invoke(InitDestroyAnnotationBeanPostProcessor.java:363)
at 
org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleMetadata.invokeInitMethods(InitDestroyAnnotationBeanPostProcessor.java:307)
at 
org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeInitialization(InitDestroyAnnotationBeanPostProcessor.java:136)
... 22 more

I can successfully run the same job.jar on a session cluster 
(start-cluster.sh;flink run job.jar). Any ideas? Feels like I am missing 
something obvious?

At MyFlinkJob.java:59 I do: streamExecutionEnvironment.execute("MyFlinkJob");

It feels strange that the execution ends up in 
org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute?

>From 
>https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java:

/**
 * A special {@link StreamExecutionEnvironment} that is used in the web 
frontend when generating
 * a user-inspectable graph of a streaming job.
 */
@PublicEvolving
public class StreamPlanEnvironment extends StreamExecutionEnvironment {



Re: Running JobManager as Deployment instead of Job

2019-02-08 Thread Vishal Santoshi
Is the rationale of using a jobID 00* also roughly the same. As in a
Flink job cluster is a single job and thus a single job id suffices ?  I am
more wondering about the case when we are doing a compatible changes to a
job and want to resume ( given we are in HA mode and thus have a
chroot/subcontext on ZK for the job cluster ) ,  it would make no sense to
give a brand new job id ?

On Thu, Feb 7, 2019 at 4:42 AM Till Rohrmann  wrote:

> Hi Sergey,
>
> the rationale why we are using a K8s job instead of a deployment is that a
> Flink job cluster should terminate after it has successfully executed the
> Flink job. This is unlike a session cluster which should run forever and
> for which a K8s deployment would be better suited.
>
> If in your use case a K8s deployment would better work, then I would
> suggest to change the `job-cluster-job.yaml` accordingly.
>
> Cheers,
> Till
>
> On Tue, Feb 5, 2019 at 4:12 PM Sergey Belikov 
> wrote:
>
>> Hi,
>>
>> my team is currently experimenting with Flink running in Kubernetes (job
>> cluster setup). And we found out that with JobManager being deployed as
>> "Job" we can't just simply update certain values in job's yaml, e.g.
>> spec.template.spec.containers.image (
>> https://github.com/kubernetes/kubernetes/issues/48388#issuecomment-319493817).
>> This causes certain troubles in our CI/CD pipelines so we are thinking
>> about using "Deployment" instead of "Job".
>>
>> With that being said I'm wondering what was the motivation behind using
>> "Job" resource for deploying JobManager? And are there any pitfalls related
>> to using Deployment and not Job for JobManager?
>>
>> Thank you in advance.
>> --
>> Best regards,
>> Sergey Belikov
>>
>


Dataset statistics

2019-02-08 Thread Flavio Pompermaier
Hi to all,
is there any effort to standardize descriptive statistics in Apache Flink?
Is there any suggested way to achieve this?

Best,
Flavio


Re: Flink Job and Watermarking

2019-02-08 Thread Chesnay Schepler
Have you considered using the metric system to access the current 
watermarks for each operator? (see 
https://ci.apache.org/projects/flink/flink-docs-master/monitoring/metrics.html#io)


On 08.02.2019 03:19, Kaustubh Rudrawar wrote:

Hi,

I'm writing a job that wants to make an HTTP request once a watermark 
has reached all tasks of an operator. It would be great if this could 
be determined from outside the Flink job, but I don't think it's 
possible to access watermark information for the job as a whole. Below 
is a workaround I've come up with:


 1. Read messages from Kafka using the provided KafkaSource. Event
time will be defined as a timestamp within the message.
 2. Key the stream based on an id from the message.
 3. DedupOperator that dedupes messages. This operator will run with a
parallelism of N.
 4. An operator that persists the messages to S3. It doesn't need to
output anything - it should ideally be a Sink (if it were a sink
we could use the StreamingFileSink).
 5. Implement an operator that will make an HTTP request once
processWatermark is called for time T. A parallelism of 1 will be
used for this operator as it will do very little work. Because it
has a parallelism of 1, the operator in step 4 cannot send
anything to it as it could become a throughput bottleneck.

Does this implementation seem like a valid workaround? Any other 
alternatives I should consider?


Thanks for your help,
Kaustubh






Sliding window buffering on restart without save point

2019-02-08 Thread shater93
Hello,

I am having a Flink pipeline processing data in several overlapping(sliding)
windows such that they span [t_i, t_i + T], where t_i is the window starting
time and T is the window size. The overlap is such that t_(I+1) - t_i = T/6
(i.e on every window size there is 6 overlapping windows).

When deploying in my CI/CD process to Kubernetes, there are sometimes
serialisation problems due to change of Flink DAG, checkpoint states and etc
as a symptom of, for instance a change of definition of these classes
(adding/removing a field). This leads to the fact that the process cannot
start from the save point that I am saving during a deploy. How could this
be managed in an efficient way? I understand that the way I am using
windowing is not optimal here so lets not focus on those solutions. 

Currently, my only approach is:
* Shutdown the streaming process in a controlled manner (replying the
running version with new configs, terminating the stream when events are
arriving after a certain timepjoint)
* After termination, move the time-point ( offset, I am using Kafka)
backwards in time, in this case T + eps to allow rebuffering of the windows.
* Start the servicer reading from the new timepjoint, but not emitting any
output events until it has passed a defined time-point (in this case the
time-point of termination).

Do you have any suggestions on how to improve this process?

Best regards and thanks in advance for any input,
William
 

 Flink Version: 1.6.2



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/