Re: Reinterpreting a pre-partitioned data stream as keyed stream

2020-01-29 Thread Guowei Ma
Hi, Krzysztof


When you use the *reinterpretAsKeyedStream* you must guarantee that
partition is the same as Flink does by yourself. But before going any
further I think we should know whether normal DataStream API could satisfy
your requirements without using *reinterpretAsKeyedStream.*


An operator could send its output to another operator in two ways:
one-to-one(forward) or redistributing[1]. In one-to-one(forward) the
partition and order of the event would keep the same in the two operators.
Two operators would use the forward by default if the parallelism of two
operator is same.

Without the total details I think maybe you could just *keyby* once if your
job does not have special needs. Or you could share the what your job looks
like if it is convenient.



[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/concepts/programming-model.html#parallel-dataflows

Best,
Guowei


KristoffSC  于2020年1月28日周二 下午10:47写道:

> Hi all,
> we have a use case where order of received events matters and it should be
> kept across pipeline.
>
> Our pipeline would be paralleled. We can key the stream just after Source
> operator, but in order to keep the ordering among next operators we would
> have to still keep the stream keyed.
>
> Obviously we could key again and again but this would cause some
> performance
> penalty.
> We were thinking about using DataStreamUtils.reinterpretAsKeyedStream
> instead.
>
> Since this is an experimental functionality I would like to ask if there is
> someone among the community that is using this feature? Do we know about
> any
> open issues regarding this feature?
>
> Thanks,
> Krzysztof
>
>
>
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Cypher support for flink graphs?

2020-01-29 Thread kant kodali
Hi All,

Can we expect open cypher support for Flink graphs?

Thanks!


Re: Cypher support for flink graphs?

2020-01-29 Thread Flavio Pompermaier
That would be awesome but I don't know about any support about Cypher. You
better go with Morpheus[1] for that.

[1] https://github.com/opencypher/morpheus

On Wed, Jan 29, 2020 at 10:28 AM kant kodali  wrote:

> Hi All,
>
> Can we expect open cypher support for Flink graphs?
>
> Thanks!
>


ActiveMQ connector

2020-01-29 Thread OskarM
Hi all,

I am using Flink with Bahir's Apache ActiveMQ connector. However it's quite
dated and poses many limitations, most notably the source supports only
ByteMessages, does not support parallelism and has a bug that is only fixed
in a snapshot version.

So I started implementing my own SourceFunction (still with parallelism of
only 1) based on AMQSource.
I want it to support Flink's checkpointing and make it work with ActiveMQ
acks.
AMQSource uses ordinary HashMap to store Messages to be acked in the broker
and this is where my question arises.

Is the HashMap safe to use here?

Please correct me if I'm wrong, but my understanding is that /run/ method is
executed in one thread and /acknowledgeIDs/ in another so there is a
possibility of thread race (even if we assume all the message ids are
unique).

Also, do you know of any ActiveMQ specific (or JMS in general), more
up-to-date connectors I could use which do not have the issues mentioned
above?

Thanks,
Oskar



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: How to debug a job stuck in a deployment/run loop?

2020-01-29 Thread Till Rohrmann
Hi Jason,

getting access to the log files would help most to figure out what's going
wrong.

Cheers,
Till

On Tue, Jan 28, 2020 at 9:08 AM Arvid Heise  wrote:

> Hi Jason,
>
> could you describe your topology? Are you writing to Kafka? Are you using
> exactly once? Are you seeing any warning?
> If so, one thing that immediately comes to my mind is
> transaction.max.timeout.ms. If the value in flink (by default 1h) is
> higher than what the Kafka brokers support, it may run into indefinite
> restart loops in rare cases.
>
> "Kafka brokers by default have transaction.max.timeout.ms set to 15
> minutes. This property will not allow to set transaction timeouts for the
> producers larger than it’s value. FlinkKafkaProducer011 by default sets
> the transaction.timeout.ms property in producer config to 1 hour, thus
> transaction.max.timeout.ms should be increased before using the
> Semantic.EXACTLY_ONCE mode."
>
> Best,
>
> Arvid
>
> On Fri, Jan 24, 2020 at 2:47 AM Jason Kania  wrote:
>
>> I am attempting to migrate from 1.7.1 to 1.9.1 and I have hit a problem
>> where previously working jobs can no longer launch after being submitted.
>> In the UI, the submitted jobs show up as deploying for a period, then go
>> into a run state before returning to the deploy state and this repeats
>> regularly with the job bouncing between states. No exceptions or errors are
>> visible in the logs. There is no data coming in for the job to process and
>> the kafka queues are empty.
>>
>> If I look at the thread activity of the task manager running the job in
>> top, I see that the busiest threads are flink-akka threads, sometimes
>> jumping to very high CPU numbers. That is all I have for info.
>>
>> Any suggestions on how to debug this? I can set break points and connect
>> if that helps, just not sure at this point where to start.
>>
>> Thanks,
>>
>> Jason
>>
>


Re: Is there anything strictly special about sink functions?

2020-01-29 Thread Till Rohrmann
As far as I know you don't have to define a sink in order to define a valid
Flink program (using Flink >= 1.9). Your topology can simply end in a map
function and it should be executable once you call env.execute().

Cheers,
Till

On Tue, Jan 28, 2020 at 10:06 AM Arvid Heise  wrote:

> As Konstantin said, you need to use a sink, but you could use
> `org.apache.flink.streaming.api.functions.sink.DiscardingSink`.
>
> There is nothing inherently wrong with outputting things through a UDF.
> You need to solve the same challenges as in a SinkFunction: you need to
> implement your own state management. Also make sure that you can handle
> duplicates occurring during recovery after a restart.
>
> On Tue, Jan 28, 2020 at 6:43 AM Konstantin Knauf 
> wrote:
>
>> Hi Andrew,
>>
>> as far as I know there is nothing particularly special about the sink in
>> terms of how it handles state or time. You can not leave the pipeline
>> "unfinished", only sinks trigger the execution of the whole pipeline.
>>
>> Cheers,
>>
>> Konstantin
>>
>>
>>
>> On Fri, Jan 24, 2020 at 5:59 PM Andrew Roberts  wrote:
>>
>>> Hello,
>>>
>>> I’m trying to push some behavior that we’ve currently got in a large,
>>> stateful SinkFunction implementation into Flink’s windowing system. The
>>> task at hand is similar to what StreamingFileSink provides, but more
>>> flexible. I don’t want to re-implement that sink, because it uses the
>>> StreamingRuntimeContext.getProcessingTimeService() via a cast - that class
>>> is marked as internal, and I’d like to avoid the exposure to an interface
>>> that could change. Extending it similarly introduces complexity I would
>>> rather not add to our codebase.
>>>
>>> WindowedStream.process() provides more or less the pieces I need, but
>>> the stream continues on after a ProcessFunction - there’s no way to
>>> process() directly into a sink. I could use a ProcessFunction[In, Unit,
>>> Key, Window], and follow that immediately with a no-op sink that discards
>>> the Unit values, or I could just leave the stream “unfinished," with no
>>> sink.
>>>
>>> Is there a downside to either of these approaches? Is there anything
>>> special about doing sink-like work in a ProcessFunction or FlatMapFunction
>>> instead of a SinkFunction?
>>>
>>> Thanks,
>>>
>>> Andrew
>>>
>>>
>>>
>>> --
>>> *Confidentiality Notice: The information contained in this e-mail and any
>>>
>>> attachments may be confidential. If you are not an intended recipient,
>>> you
>>>
>>> are hereby notified that any dissemination, distribution or copying of
>>> this
>>>
>>> e-mail is strictly prohibited. If you have received this e-mail in error,
>>>
>>> please notify the sender and permanently delete the e-mail and any
>>>
>>> attachments immediately. You should not retain, copy or use this e-mail
>>> or
>>>
>>> any attachment for any purpose, nor disclose all or any part of the
>>>
>>> contents to any other person. Thank you.*
>>>
>>
>>
>> --
>>
>> Konstantin Knauf | Solutions Architect
>>
>> +49 160 91394525
>>
>>
>> Follow us @VervericaData Ververica 
>>
>>
>> --
>>
>> Join Flink Forward  - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>> Ververica GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>> (Tony) Cheng
>>
>


Re: Blocking KeyedCoProcessFunction.processElement1

2020-01-29 Thread Till Rohrmann
Yes, using AsyncIO could help in the case of blocking operations.

Cheers,
Till

On Tue, Jan 28, 2020 at 10:45 AM Taher Koitawala  wrote:

> Would AsyncIO operator not be an option for you to connect to RDBMS?
>
> On Tue, Jan 28, 2020, 12:45 PM Alexey Trenikhun  wrote:
>
>> Thank you Yun Tang.
>> My implementation potentially could block for significant amount of time,
>> because I wanted to do RDBMS maintenance (create partitions for new data,
>> purge old data etc) in-line with writing stream data to a database
>>
>> --
>> *From:* Yun Tang 
>> *Sent:* Sunday, January 26, 2020 8:42:37 AM
>> *To:* Alexey Trenikhun ; user@flink.apache.org <
>> user@flink.apache.org>
>> *Subject:* Re: Blocking KeyedCoProcessFunction.processElement1
>>
>> Hi Alexey
>>
>> Actually, I don't understand why you thing
>> KeyedCoProcessFunction#processElement1 would block for significant amount
>> of time, it just process record from the elements in the first input stream
>> which is necessary. If you really find it would block for a long time, I
>> think that's because your processing logic has some problem to stuck. On
>> the other hand, since processing checkpoint and records hold the same lock,
>> we cannot process checkpoint when the record processing logic did not
>> release the lock.
>>
>> Best
>> Yun Tang
>> --
>> *From:* Alexey Trenikhun 
>> *Sent:* Thursday, January 23, 2020 13:04
>> *To:* user@flink.apache.org 
>> *Subject:* Blocking KeyedCoProcessFunction.processElement1
>>
>>
>> Hello,
>> If KeyedCoProcessFunction.processElement1 blocks for significant amount
>> of time, will it prevent checkpoint ?
>>
>> Thanks,
>> Alexey
>>
>


Re: Is there anything strictly special about sink functions?

2020-01-29 Thread Andrew Roberts
Can I expect checkpointing to behave normally without a sink, or do sink 
functions Invoke some special behavior?

My hope is that sinks are isomorphic to FlatMap[A, Nothing], but it’s a 
challenge to verify all the bits of behavior observationally. 

Thanks for all your help!

> On Jan 29, 2020, at 7:58 AM, Till Rohrmann  wrote:
> 
> 
> As far as I know you don't have to define a sink in order to define a valid 
> Flink program (using Flink >= 1.9). Your topology can simply end in a map 
> function and it should be executable once you call env.execute().
> 
> Cheers,
> Till
> 
>> On Tue, Jan 28, 2020 at 10:06 AM Arvid Heise  wrote:
>> As Konstantin said, you need to use a sink, but you could use 
>> `org.apache.flink.streaming.api.functions.sink.DiscardingSink`. 
>> 
>> There is nothing inherently wrong with outputting things through a UDF. You 
>> need to solve the same challenges as in a SinkFunction: you need to 
>> implement your own state management. Also make sure that you can handle 
>> duplicates occurring during recovery after a restart.
>> 
>>> On Tue, Jan 28, 2020 at 6:43 AM Konstantin Knauf  
>>> wrote:
>>> Hi Andrew, 
>>> 
>>> as far as I know there is nothing particularly special about the sink in 
>>> terms of how it handles state or time. You can not leave the pipeline 
>>> "unfinished", only sinks trigger the execution of the whole pipeline.
>>> 
>>> Cheers, 
>>> 
>>> Konstantin
>>> 
>>> 
>>> 
 On Fri, Jan 24, 2020 at 5:59 PM Andrew Roberts  wrote:
 Hello,
 
 I’m trying to push some behavior that we’ve currently got in a large, 
 stateful SinkFunction implementation into Flink’s windowing system. The 
 task at hand is similar to what StreamingFileSink provides, but more 
 flexible. I don’t want to re-implement that sink, because it uses the 
 StreamingRuntimeContext.getProcessingTimeService() via a cast - that class 
 is marked as internal, and I’d like to avoid the exposure to an interface 
 that could change. Extending it similarly introduces complexity I would 
 rather not add to our codebase.
 
 WindowedStream.process() provides more or less the pieces I need, but the 
 stream continues on after a ProcessFunction - there’s no way to process() 
 directly into a sink. I could use a ProcessFunction[In, Unit, Key, 
 Window], and follow that immediately with a no-op sink that discards the 
 Unit values, or I could just leave the stream “unfinished," with no sink.
 
 Is there a downside to either of these approaches? Is there anything 
 special about doing sink-like work in a ProcessFunction or FlatMapFunction 
 instead of a SinkFunction?
 
 Thanks,
 
 Andrew
 
 
 
 -- 
 *Confidentiality Notice: The information contained in this e-mail and any
 
 attachments may be confidential. If you are not an intended recipient, you
 
 are hereby notified that any dissemination, distribution or copying of this
 
 e-mail is strictly prohibited. If you have received this e-mail in error,
 
 please notify the sender and permanently delete the e-mail and any
 
 attachments immediately. You should not retain, copy or use this e-mail or
 
 any attachment for any purpose, nor disclose all or any part of the
 
 contents to any other person. Thank you.*
>>> 
>>> 
>>> -- 
>>> Konstantin Knauf | Solutions Architect
>>> +49 160 91394525
>>> 
>>> Follow us @VervericaData Ververica
>>> 
>>> --
>>> Join Flink Forward - The Apache Flink Conference
>>> Stream Processing | Event Driven | Real Time
>>> --
>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>> --
>>> Ververica GmbH
>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji 
>>> (Tony) Cheng

-- 
*Confidentiality Notice: The information contained in this e-mail and any

attachments may be confidential. If you are not an intended recipient, you

are hereby notified that any dissemination, distribution or copying of this

e-mail is strictly prohibited. If you have received this e-mail in error,

please notify the sender and permanently delete the e-mail and any

attachments immediately. You should not retain, copy or use this e-mail or

any attachment for any purpose, nor disclose all or any part of the

contents to any other person. Thank you.*


Re: Flink distribution housekeeping for YARN sessions

2020-01-29 Thread Till Rohrmann
Hi Theo,

your assumption is correct that Flink won't clean up its files when using
`yarn application -kill ID`. This should also hold true for other temporary
files generated by Flink's Blob service, shuffle service and io manager.
These files are usually stored under /tmp and should be cleaned up
eventually, though.

I think a better approach is to reconnect to the Flink Yarn session cluster
and then issue the "stop" command. You can either do it via
`bin/yarn-session.sh -id APP_ID` and then type "stop" or you do `echo
"stop" | bin/yarn-session.sh -id APP_ID`.

I think we should also update the logging statements of the yarn-session.sh
which say that you should use `yarn application -kill` in order to stop the
process.

Cheers,
Till

On Tue, Jan 28, 2020 at 6:21 PM Theo Diefenthal <
theo.diefent...@scoop-software.de> wrote:

> Hi there,
>
> Today I realized that we currently have a lot of not housekept flink
> distribution jar files and would like to know what to do about this, i.e.
> how to proper housekeep them.
>
> In the job submitting HDFS home directory, I find a subdirectory called
> `.flink` with hundreds of subfolders like `application_1573731655031_0420`,
> having the following structure:
>
> -rw-r--r--   3 dev dev861 2020-01-27 21:17
> /user/dev/.flink/application_1580155950981_0010/4797ff6e-853b-460c-81b3-34078814c5c9-taskmanager-conf.yaml
> -rw-r--r--   3 dev dev691 2020-01-27 21:16
> /user/dev/.flink/application_1580155950981_0010/application_1580155950981_0010-flink-conf.yaml2755466919863419496.tmp
> -rw-r--r--   3 dev dev861 2020-01-27 21:17
> /user/dev/.flink/application_1580155950981_0010/fdb5ef57-c140-4f6d-9791-c226eb1438ce-taskmanager-conf.yaml
> -rw-r--r--   3 dev dev 92.2 M 2020-01-27 21:16
> /user/dev/.flink/application_1580155950981_0010/flink-dist_2.11-1.9.1.jar
> drwxr-xr-x   - dev dev  0 2020-01-27 21:16
> /user/dev/.flink/application_1580155950981_0010/lib
> -rw-r--r--   3 dev dev  2.6 K 2020-01-27 21:16
> /user/dev/.flink/application_1580155950981_0010/log4j.properties
> -rw-r--r--   3 dev dev  2.3 K 2020-01-27 21:16
> /user/dev/.flink/application_1580155950981_0010/logback.xml
> drwxr-xr-x   - dev dev  0 2020-01-27 21:16
> /user/dev/.flink/application_1580155950981_0010/plugins
>
> With having tons of those folders (For each flink session we
> launched/killed in our CI CD pipeline), they sum up to some terrabytes in
> our HDFS in used space.
> I suppose, I kill our flink sessions wrongly. We start and stop sessions
> and and jobs separately like so:
>
> Start:
>
> ${OS_ROOT}/flink/bin/yarn-session.sh -jm 4g -tm 32g --name 
> "${FLINK_SESSION_NAME}" -d -Denv.java.opts="-XX:+HeapDumpOnOutOfMemoryError"
>
> ${OS_ROOT}/flink/bin/flink run -m ${FLINK_HOST} [..savepoint/checkpoint 
> options...] -d -n "${JOB_JAR}" $*
>
> Stop
>
> ${OS_ROOT}/flink/bin/flink stop -p ${SAVEPOINT_BASEDIR}/${FLINK_JOB_NAME} -m 
> ${FLINK_HOST} ${ID}
>
> yarn application -kill "${ID}"
>
>
> yarn application -kill was the best I could find as the flink docu states,
> the linux session process should only be closed (" Stop the YARN session by
> stopping the unix process (using CTRL+C) or by entering ‘stop’ into the
> client.").
>
> Now my question: Is there a more elegant way to kill a yarn session
> (remotely from some host in the cluster, not necessarily the one starting
> the detached session), which also does the housekeeping then? Or should I
> do the housekeeping myself manually? (Pretty easy to script). Do I need to
> expect any more side effects when killing the session with "yarn
> application -kill"?
>
> Best regards
> Theo
>
> --
> SCOOP Software GmbH - Gut Maarhausen - Eiler Straße 3 P - D-51107 Köln
> Theo Diefenthal
>
> T +49 221 801916-196 - F +49 221 801916-17 - M +49 160 90506575
> theo.diefent...@scoop-software.de - www.scoop-software.de
> Sitz der Gesellschaft: Köln, Handelsregister: Köln,
> Handelsregisternummer: HRB 36625
> Geschäftsführung: Dr. Oleg Balovnev, Frank Heinen,
> Martin Müller-Rohde, Dr. Wolfgang Reddig, Roland Scheel
>


Re: Flink distribution housekeeping for YARN sessions

2020-01-29 Thread Till Rohrmann
Here is the corresponding JIRA ticket:
https://issues.apache.org/jira/browse/FLINK-15806

On Wed, Jan 29, 2020 at 3:16 PM Till Rohrmann  wrote:

> Hi Theo,
>
> your assumption is correct that Flink won't clean up its files when using
> `yarn application -kill ID`. This should also hold true for other temporary
> files generated by Flink's Blob service, shuffle service and io manager.
> These files are usually stored under /tmp and should be cleaned up
> eventually, though.
>
> I think a better approach is to reconnect to the Flink Yarn session
> cluster and then issue the "stop" command. You can either do it via
> `bin/yarn-session.sh -id APP_ID` and then type "stop" or you do `echo
> "stop" | bin/yarn-session.sh -id APP_ID`.
>
> I think we should also update the logging statements of the
> yarn-session.sh which say that you should use `yarn application -kill` in
> order to stop the process.
>
> Cheers,
> Till
>
> On Tue, Jan 28, 2020 at 6:21 PM Theo Diefenthal <
> theo.diefent...@scoop-software.de> wrote:
>
>> Hi there,
>>
>> Today I realized that we currently have a lot of not housekept flink
>> distribution jar files and would like to know what to do about this, i.e.
>> how to proper housekeep them.
>>
>> In the job submitting HDFS home directory, I find a subdirectory called
>> `.flink` with hundreds of subfolders like `application_1573731655031_0420`,
>> having the following structure:
>>
>> -rw-r--r--   3 dev dev861 2020-01-27 21:17
>> /user/dev/.flink/application_1580155950981_0010/4797ff6e-853b-460c-81b3-34078814c5c9-taskmanager-conf.yaml
>> -rw-r--r--   3 dev dev691 2020-01-27 21:16
>> /user/dev/.flink/application_1580155950981_0010/application_1580155950981_0010-flink-conf.yaml2755466919863419496.tmp
>> -rw-r--r--   3 dev dev861 2020-01-27 21:17
>> /user/dev/.flink/application_1580155950981_0010/fdb5ef57-c140-4f6d-9791-c226eb1438ce-taskmanager-conf.yaml
>> -rw-r--r--   3 dev dev 92.2 M 2020-01-27 21:16
>> /user/dev/.flink/application_1580155950981_0010/flink-dist_2.11-1.9.1.jar
>> drwxr-xr-x   - dev dev  0 2020-01-27 21:16
>> /user/dev/.flink/application_1580155950981_0010/lib
>> -rw-r--r--   3 dev dev  2.6 K 2020-01-27 21:16
>> /user/dev/.flink/application_1580155950981_0010/log4j.properties
>> -rw-r--r--   3 dev dev  2.3 K 2020-01-27 21:16
>> /user/dev/.flink/application_1580155950981_0010/logback.xml
>> drwxr-xr-x   - dev dev  0 2020-01-27 21:16
>> /user/dev/.flink/application_1580155950981_0010/plugins
>>
>> With having tons of those folders (For each flink session we
>> launched/killed in our CI CD pipeline), they sum up to some terrabytes in
>> our HDFS in used space.
>> I suppose, I kill our flink sessions wrongly. We start and stop sessions
>> and and jobs separately like so:
>>
>> Start:
>>
>> ${OS_ROOT}/flink/bin/yarn-session.sh -jm 4g -tm 32g --name 
>> "${FLINK_SESSION_NAME}" -d -Denv.java.opts="-XX:+HeapDumpOnOutOfMemoryError"
>>
>> ${OS_ROOT}/flink/bin/flink run -m ${FLINK_HOST} [..savepoint/checkpoint 
>> options...] -d -n "${JOB_JAR}" $*
>>
>> Stop
>>
>> ${OS_ROOT}/flink/bin/flink stop -p ${SAVEPOINT_BASEDIR}/${FLINK_JOB_NAME} -m 
>> ${FLINK_HOST} ${ID}
>>
>> yarn application -kill "${ID}"
>>
>>
>> yarn application -kill was the best I could find as the flink docu
>> states, the linux session process should only be closed (" Stop the YARN
>> session by stopping the unix process (using CTRL+C) or by entering ‘stop’
>> into the client.").
>>
>> Now my question: Is there a more elegant way to kill a yarn session
>> (remotely from some host in the cluster, not necessarily the one starting
>> the detached session), which also does the housekeeping then? Or should I
>> do the housekeeping myself manually? (Pretty easy to script). Do I need to
>> expect any more side effects when killing the session with "yarn
>> application -kill"?
>>
>> Best regards
>> Theo
>>
>> --
>> SCOOP Software GmbH - Gut Maarhausen - Eiler Straße 3 P - D-51107 Köln
>> Theo Diefenthal
>>
>> T +49 221 801916-196 - F +49 221 801916-17 - M +49 160 90506575
>> theo.diefent...@scoop-software.de - www.scoop-software.de
>> Sitz der Gesellschaft: Köln, Handelsregister: Köln,
>> Handelsregisternummer: HRB 36625
>> Geschäftsführung: Dr. Oleg Balovnev, Frank Heinen,
>> Martin Müller-Rohde, Dr. Wolfgang Reddig, Roland Scheel
>>
>


Re: Is there anything strictly special about sink functions?

2020-01-29 Thread Till Rohrmann
Yes, checkpointing should behave normally without a sink. If I am not
mistaken, then sinks should indeed be isomorphic to FlatMap[A, Nothing].
However, there is no guarantee that this will always stay like this.

Cheers,
Till

On Wed, Jan 29, 2020 at 2:53 PM Andrew Roberts  wrote:

> Can I expect checkpointing to behave normally without a sink, or do sink
> functions Invoke some special behavior?
>
> My hope is that sinks are isomorphic to FlatMap[A, Nothing], but it’s a
> challenge to verify all the bits of behavior observationally.
>
> Thanks for all your help!
>
> On Jan 29, 2020, at 7:58 AM, Till Rohrmann  wrote:
>
> 
> As far as I know you don't have to define a sink in order to define a
> valid Flink program (using Flink >= 1.9). Your topology can simply end in a
> map function and it should be executable once you call env.execute().
>
> Cheers,
> Till
>
> On Tue, Jan 28, 2020 at 10:06 AM Arvid Heise  wrote:
>
>> As Konstantin said, you need to use a sink, but you could use
>> `org.apache.flink.streaming.api.functions.sink.DiscardingSink`.
>>
>> There is nothing inherently wrong with outputting things through a UDF.
>> You need to solve the same challenges as in a SinkFunction: you need to
>> implement your own state management. Also make sure that you can handle
>> duplicates occurring during recovery after a restart.
>>
>> On Tue, Jan 28, 2020 at 6:43 AM Konstantin Knauf <
>> konstan...@ververica.com> wrote:
>>
>>> Hi Andrew,
>>>
>>> as far as I know there is nothing particularly special about the sink in
>>> terms of how it handles state or time. You can not leave the pipeline
>>> "unfinished", only sinks trigger the execution of the whole pipeline.
>>>
>>> Cheers,
>>>
>>> Konstantin
>>>
>>>
>>>
>>> On Fri, Jan 24, 2020 at 5:59 PM Andrew Roberts 
>>> wrote:
>>>
 Hello,

 I’m trying to push some behavior that we’ve currently got in a large,
 stateful SinkFunction implementation into Flink’s windowing system. The
 task at hand is similar to what StreamingFileSink provides, but more
 flexible. I don’t want to re-implement that sink, because it uses the
 StreamingRuntimeContext.getProcessingTimeService() via a cast - that class
 is marked as internal, and I’d like to avoid the exposure to an interface
 that could change. Extending it similarly introduces complexity I would
 rather not add to our codebase.

 WindowedStream.process() provides more or less the pieces I need, but
 the stream continues on after a ProcessFunction - there’s no way to
 process() directly into a sink. I could use a ProcessFunction[In, Unit,
 Key, Window], and follow that immediately with a no-op sink that discards
 the Unit values, or I could just leave the stream “unfinished," with no
 sink.

 Is there a downside to either of these approaches? Is there anything
 special about doing sink-like work in a ProcessFunction or FlatMapFunction
 instead of a SinkFunction?

 Thanks,

 Andrew



 --
 *Confidentiality Notice: The information contained in this e-mail and
 any

 attachments may be confidential. If you are not an intended recipient,
 you

 are hereby notified that any dissemination, distribution or copying of
 this

 e-mail is strictly prohibited. If you have received this e-mail in
 error,

 please notify the sender and permanently delete the e-mail and any

 attachments immediately. You should not retain, copy or use this e-mail
 or

 any attachment for any purpose, nor disclose all or any part of the

 contents to any other person. Thank you.*

>>>
>>>
>>> --
>>>
>>> Konstantin Knauf | Solutions Architect
>>>
>>> +49 160 91394525
>>>
>>>
>>> Follow us @VervericaData Ververica 
>>>
>>>
>>> --
>>>
>>> Join Flink Forward  - The Apache Flink
>>> Conference
>>>
>>> Stream Processing | Event Driven | Real Time
>>>
>>> --
>>>
>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>
>>> --
>>> Ververica GmbH
>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>>> (Tony) Cheng
>>>
>>
> *Confidentiality Notice: The information contained in this e-mail and any
> attachments may be confidential. If you are not an intended recipient, you
> are hereby notified that any dissemination, distribution or copying of this
> e-mail is strictly prohibited. If you have received this e-mail in error,
> please notify the sender and permanently delete the e-mail and any
> attachments immediately. You should not retain, copy or use this e-mail or
> any attachment for any purpose, nor disclose all or any part of the
> contents to any other person. Thank you.*


Re: Does flink support retries on checkpoint write failures

2020-01-29 Thread Till Rohrmann
Hi Richard,

googling a bit indicates that this might actually be a GCS problem [1, 2,
3]. The proposed solution/workaround so far is to retry the whole upload
operation as part of the application logic. Since I assume that you are
writing to GCS via Hadoop's file system this should actually fall into the
realm of the Hadoop file system implementation and not Flink.

What you could do to mitigate the problem a bit is to set the number of
tolerable checkpoint failures to a non-zero value via
`CheckpointConfig.setTolerableCheckpointFailureNumber`. Setting this to `n`
means that the job will only fail and then restart after `n` checkpoint
failures. Unfortunately, we do not support a failure rate yet.

[1] https://github.com/googleapis/google-cloud-java/issues/3586
[2] https://github.com/googleapis/google-cloud-java/issues/5704
[3] https://issuetracker.google.com/issues/137168102

Cheers,
Till

On Tue, Jan 28, 2020 at 6:25 PM Richard Deurwaarder  wrote:

> Hi all,
>
> We've got a Flink job running on 1.8.0 which writes its state (rocksdb) to
> Google Cloud Storage[1]. We've noticed that jobs with a large amount of
> state (500gb range) are becoming *very* unstable. In the order of
> restarting once an hour or even more.
>
> The reason for this instability is that we run into "410 Gone"[4] errors
> from Google Cloud Storage. This indicates an upload (write from Flink's
> perspective) took place and it wanted to resume the write[2] but could not
> find the file which it needed to resume. My guess is this is because the
> previous attempt either failed or perhaps it uploads in chunks of 67mb [3].
>
> The library logs this line when this happens:
>
> "Encountered status code 410 when accessing URL
> https://www.googleapis.com/upload/storage/v1/b//o?ifGenerationMatch=0&name=job-manager/15aa2391-a055-4bfd-8d82-e9e4806baa9c/8ae818761055cdc022822010a8b4a1ed/chk-52224/_metadata&uploadType=resumable&upload_id=AEnB2UqJwkdrQ8YuzqrTp9Nk4bDnzbuJcTlD5E5hKNLNz4xQ7vjlYrDzYC29ImHcp0o6OjSCmQo6xkDSj5OHly7aChH0JxxXcg.
> Delegating to response handler for possible retry."
>
> We're kind of stuck on these questions:
> * Is flink capable or doing these retries?
> * Does anyone succesfully write their (rocksdb) state to Google Cloud
> storage for bigger state sizes?
> * Is it possible flink renames or deletes certain directories before all
> flushes have been done based on an atomic guarantee provided by HDFS that
> does not hold on other implementations perhaps? A race condition of sorts
>
> Basically does anyone recognize this behavior?
>
> Regards,
>
> Richard Deurwaarder
>
> [1] We use an HDFS implementation provided by Google
> https://github.com/GoogleCloudDataproc/bigdata-interop/tree/master/gcs
> [2]
> https://cloud.google.com/storage/docs/json_api/v1/status-codes#410_Gone
> [3]
> https://github.com/GoogleCloudDataproc/bigdata-interop/blob/master/gcs/CONFIGURATION.md
>  (see
> fs.gs.outputstream.upload.chunk.size)
> [4] Stacktrace:
> https://gist.github.com/Xeli/da4c0af2c49c060139ad01945488e492
>


Re: is streaming outer join sending unnecessary traffic?

2020-01-29 Thread Till Rohrmann
Hi Kant,

I am not an expert on Flink's SQL implementation. Hence, I'm pulling in
Timo and Jark who might help you with your question.

Cheers,
Till

On Tue, Jan 28, 2020 at 10:46 PM kant kodali  wrote:

> Sorry. fixed some typos.
>
> I am doing a streaming outer join from four topics in Kafka lets call them
> sample1, sample2, sample3, sample4. Each of these test topics has just one
> column which is of tuple string. my query is this
>
> SELECT * FROM sample1 FULL OUTER JOIN sample2 on sample1.f0=sample2.f0 FULL 
> OUTER JOIN sample3 on sample2.f0=sample3.f0 FULL OUTER JOIN sample4 on 
> sample3.f0=sample4.f0
>
>
> And here is how I send messages to those Kafka topics at various times.
>
> At time t1 Send a message "flink" to test-topic1
>
> (true,flink,null,null,null) // Looks good
>
> At time t2 Send a message "flink" to test-topic4
>
> (true,null,null,null,flink) // Looks good
>
> At time t3 Send a message "flink" to test-topic3
>
> (false,null,null,null,flink) // Looks good
> (true,null,null,flink,flink) //Looks good
>
> At time t4 Send a message "flink" to test-topic2
>
> (false,flink,null,null,null) // Looks good
> (false,null,null,flink,flink) // Looks good
> *(true,null,null,null,flink) // Redundant?*
> *(false,null,null,null,flink) // Redundant?*
> (true,flink,flink,flink,flink) //Looks good
>
> Assume t1
> Those two rows above seem to be redundant to me although the end result is
> correct. Doesn't see the same behavior if I join two topics. These
> redundant messages can lead to a lot of database operations underneath so
> any way to optimize this? I am using Flink 1.9 so not sure if this is
> already fixed in 1.10.
>
> Attached the code as well.
>
> Thanks!
> kant
>
>
> On Tue, Jan 28, 2020 at 1:43 PM kant kodali  wrote:
>
>> Hi All,
>>
>> I am doing a streaming outer join from four topics in Kafka lets call
>> them sample1, sample2, sample3, sample4. Each of these test topics has just
>> one column which is of tuple string. my query is this
>>
>> SELECT * FROM sample1 FULL OUTER JOIN sample2 on sample1.f0=sample2.f0 FULL 
>> OUTER JOIN sample3 on sample2.f0=sample3.f0 FULL OUTER JOIN sample4 on 
>> sample3.f0=sample4.f0
>>
>>
>> And here is how I send messages to those Kafka topics at various times.
>>
>> At time t1 Send a message "flink" to test-topic1
>>
>> (true,flink,null,null,null) // Looks good
>>
>> At time t2 Send a message "flink" to test-topic4
>>
>> (true,null,null,null,flink) // Looks good
>>
>> At time t3 Send a message "flink" to test-topic3
>>
>> (false,null,null,null,flink) // Looks good
>> (true,null,null,flink,flink) //Looks good
>>
>> At time t3 Send a message "flink" to test-topic2
>>
>> (false,flink,null,null,null) // Looks good
>> (false,null,null,flink,flink) // Looks good
>> *(true,null,null,null,flink) // Redundant?*
>> *(false,null,null,null,flink) // Redundant?*
>> (true,flink,flink,flink,flink) //Looks good
>>
>> Those two rows above seem to be redundant to be although the end result
>> is correct. Doesn't see the same behavior if I join two topics. This
>> unwanted message will lead to a lot of database operations underneath so
>> any way to optimize this? I am using Flink 1.9 so not sure if this is
>> already fixed in 1.10.
>>
>> Attached the code as well.
>>
>> Thanks!
>> kant
>>
>>
>>
>>


Re: Flink+YARN HDFS replication factor

2020-01-29 Thread Till Rohrmann
Hi Piper,

in general, Flink does not store transient data such as event data on HDFS.
Event data (data which is sent between the TaskManager's to process it) is
only kept in memory and if becoming too big spilled by some operators to
local disk.

What Flink stores on HDFS (given it is configured this way), is the state
data which is part of the jobs checkpoints. Moreover, Flink stores the job
information such as the JobGraph and the corresponding blobs (Jars and job
artifacts) on HDFS if configured so.

Cheers,
Till

On Wed, Jan 29, 2020 at 7:07 AM Piper Piper  wrote:

> Hello,
>
> When using Flink+YARN (with HDFS) and having a long running Flink session
> (mode) cluster with a Flink client submitting jobs, the HDFS could have a
> replication factor greater than 1 (example 3).
>
> So, I would like to know when and how any of the data (like event-data or
> batch-data) or code (like JAR) in a Flink job is saved to the HDFS and is
> replicated in the entire YARN cluster of nodes?
>
> For example, in streaming applications, would all the event-data only be
> in memory (RAM) until it reaches the DAG's sink and then must be saved into
> HDFS?
>
> Thank you,
>
> Piper
>


Re: Apache Flink Job fails repeatedly due to RemoteTransportException

2020-01-29 Thread Till Rohrmann
Hi M Singh,

have you checked the TaskManager logs
of ip-xx-xxx-xxx-xxx.ec2.internal/xx.xxx.xxx.xxx:39623 for any suspicious
logging statements? This might help to uncover why another node thinks that
this TaskManager is no longer reachable.

You could also try whether the same problem remains if you upgrade to one
of Flink latest versions (1.9.1 for example).

Cheers,
Till

On Wed, Jan 29, 2020 at 8:37 AM M Singh  wrote:

> Hi Folks:
>
> We have streaming Flink application (using v 1.6.2) and it dies within 12
> hours.  We have configured number of restarts which is 10 at the moment.
>
> Sometimes the job runs for some time and then within a very short time has
> a number of restarts and finally fails.  In other instances, the restarts
> happen randomly. So there is no pattern that I could discern for the
> restarts.
>
> I can increase the restart count but would like to see if there is any
> advice on the root cause of this issue.  I've seen a some emails in the
> user groups but could not find any definitive solution or investigation
> steps.
>
> Is there any any on how to investigate it further or resolve it ?
>
> The exception we see in the job manager is:
>
> 2020-01-29 06:15:42,371 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Job testJob 
> (d65a52389f9ea30def1fe522bf3956c6) switched from state FAILING to FAILED.
> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: 
> Connection unexpectedly closed by remote task manager 
> 'ip-xx-xxx-xxx-xxx.ec2.internal/xx.xxx.xxx.xxx:39623'. This might indicate 
> that the remote task manager was lost.
>   at 
> org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:136)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:224)
>   at 
> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:377)
>   at 
> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:342)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:224)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1429)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:947)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:822)
>   at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
>   at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
>   at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884)
>   at java.lang.Thread.run(Thread.java:748)
> 2020-01-29 06:15:42,371 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Could not 
> restart the job testJob (d65a52389f9ea30def1fe522bf3956c6) because the 
> restart strategy prevented it.
>
>
>


Re: Does flink support retries on checkpoint write failures

2020-01-29 Thread Richard Deurwaarder
Hi Till,

I'll see if we can ask google to comment on those issues, perhaps they have
a fix in the works that would solve the root problem.
In the meanwhile
`CheckpointConfig.setTolerableCheckpointFailureNumber` sounds very
promising!
Thank you for this. I'm going to try this tomorrow to see if that helps. I
will let you know!

Richard

On Wed, Jan 29, 2020 at 3:47 PM Till Rohrmann  wrote:

> Hi Richard,
>
> googling a bit indicates that this might actually be a GCS problem [1, 2,
> 3]. The proposed solution/workaround so far is to retry the whole upload
> operation as part of the application logic. Since I assume that you are
> writing to GCS via Hadoop's file system this should actually fall into the
> realm of the Hadoop file system implementation and not Flink.
>
> What you could do to mitigate the problem a bit is to set the number of
> tolerable checkpoint failures to a non-zero value via
> `CheckpointConfig.setTolerableCheckpointFailureNumber`. Setting this to `n`
> means that the job will only fail and then restart after `n` checkpoint
> failures. Unfortunately, we do not support a failure rate yet.
>
> [1] https://github.com/googleapis/google-cloud-java/issues/3586
> [2] https://github.com/googleapis/google-cloud-java/issues/5704
> [3] https://issuetracker.google.com/issues/137168102
>
> Cheers,
> Till
>
> On Tue, Jan 28, 2020 at 6:25 PM Richard Deurwaarder 
> wrote:
>
>> Hi all,
>>
>> We've got a Flink job running on 1.8.0 which writes its state (rocksdb)
>> to Google Cloud Storage[1]. We've noticed that jobs with a large amount of
>> state (500gb range) are becoming *very* unstable. In the order of
>> restarting once an hour or even more.
>>
>> The reason for this instability is that we run into "410 Gone"[4] errors
>> from Google Cloud Storage. This indicates an upload (write from Flink's
>> perspective) took place and it wanted to resume the write[2] but could not
>> find the file which it needed to resume. My guess is this is because the
>> previous attempt either failed or perhaps it uploads in chunks of 67mb [3].
>>
>> The library logs this line when this happens:
>>
>> "Encountered status code 410 when accessing URL
>> https://www.googleapis.com/upload/storage/v1/b//o?ifGenerationMatch=0&name=job-manager/15aa2391-a055-4bfd-8d82-e9e4806baa9c/8ae818761055cdc022822010a8b4a1ed/chk-52224/_metadata&uploadType=resumable&upload_id=AEnB2UqJwkdrQ8YuzqrTp9Nk4bDnzbuJcTlD5E5hKNLNz4xQ7vjlYrDzYC29ImHcp0o6OjSCmQo6xkDSj5OHly7aChH0JxxXcg.
>> Delegating to response handler for possible retry."
>>
>> We're kind of stuck on these questions:
>> * Is flink capable or doing these retries?
>> * Does anyone succesfully write their (rocksdb) state to Google Cloud
>> storage for bigger state sizes?
>> * Is it possible flink renames or deletes certain directories before all
>> flushes have been done based on an atomic guarantee provided by HDFS that
>> does not hold on other implementations perhaps? A race condition of sorts
>>
>> Basically does anyone recognize this behavior?
>>
>> Regards,
>>
>> Richard Deurwaarder
>>
>> [1] We use an HDFS implementation provided by Google
>> https://github.com/GoogleCloudDataproc/bigdata-interop/tree/master/gcs
>> [2]
>> https://cloud.google.com/storage/docs/json_api/v1/status-codes#410_Gone
>> [3]
>> https://github.com/GoogleCloudDataproc/bigdata-interop/blob/master/gcs/CONFIGURATION.md
>>  (see
>> fs.gs.outputstream.upload.chunk.size)
>> [4] Stacktrace:
>> https://gist.github.com/Xeli/da4c0af2c49c060139ad01945488e492
>>
>


fliter and flatMap operation VS only a flatMap operation

2020-01-29 Thread Soheil Pourbafrani
Hi,

In case we need to filter operation followed by a transformation, which one
is more efficient in Flink, applying the filter operation first and then a
flatMap operation separately OR using only a flatMap operation that
internally includes the filter logic, too?

best
Soheil


Re: Does flink support retries on checkpoint write failures

2020-01-29 Thread wvl
Forgive my lack of knowledge here - I'm a bit out of my league here.

But I was wondering if allowing e.g. 1 checkpoint to fail and the reason
for which somehow caused a record to be lost (e.g. rocksdb exception /
taskmanager crash / etc), there would be no Source rewind to the last
successful checkpoint and this record would be lost forever, correct?

On Wed, 29 Jan 2020, 17:51 Richard Deurwaarder,  wrote:

> Hi Till,
>
> I'll see if we can ask google to comment on those issues, perhaps they
> have a fix in the works that would solve the root problem.
> In the meanwhile
> `CheckpointConfig.setTolerableCheckpointFailureNumber` sounds very
> promising!
> Thank you for this. I'm going to try this tomorrow to see if that helps. I
> will let you know!
>
> Richard
>
> On Wed, Jan 29, 2020 at 3:47 PM Till Rohrmann 
> wrote:
>
>> Hi Richard,
>>
>> googling a bit indicates that this might actually be a GCS problem [1, 2,
>> 3]. The proposed solution/workaround so far is to retry the whole upload
>> operation as part of the application logic. Since I assume that you are
>> writing to GCS via Hadoop's file system this should actually fall into the
>> realm of the Hadoop file system implementation and not Flink.
>>
>> What you could do to mitigate the problem a bit is to set the number of
>> tolerable checkpoint failures to a non-zero value via
>> `CheckpointConfig.setTolerableCheckpointFailureNumber`. Setting this to `n`
>> means that the job will only fail and then restart after `n` checkpoint
>> failures. Unfortunately, we do not support a failure rate yet.
>>
>> [1] https://github.com/googleapis/google-cloud-java/issues/3586
>> [2] https://github.com/googleapis/google-cloud-java/issues/5704
>> [3] https://issuetracker.google.com/issues/137168102
>>
>> Cheers,
>> Till
>>
>> On Tue, Jan 28, 2020 at 6:25 PM Richard Deurwaarder 
>> wrote:
>>
>>> Hi all,
>>>
>>> We've got a Flink job running on 1.8.0 which writes its state (rocksdb)
>>> to Google Cloud Storage[1]. We've noticed that jobs with a large amount of
>>> state (500gb range) are becoming *very* unstable. In the order of
>>> restarting once an hour or even more.
>>>
>>> The reason for this instability is that we run into "410 Gone"[4] errors
>>> from Google Cloud Storage. This indicates an upload (write from Flink's
>>> perspective) took place and it wanted to resume the write[2] but could not
>>> find the file which it needed to resume. My guess is this is because the
>>> previous attempt either failed or perhaps it uploads in chunks of 67mb [3].
>>>
>>> The library logs this line when this happens:
>>>
>>> "Encountered status code 410 when accessing URL
>>> https://www.googleapis.com/upload/storage/v1/b//o?ifGenerationMatch=0&name=job-manager/15aa2391-a055-4bfd-8d82-e9e4806baa9c/8ae818761055cdc022822010a8b4a1ed/chk-52224/_metadata&uploadType=resumable&upload_id=AEnB2UqJwkdrQ8YuzqrTp9Nk4bDnzbuJcTlD5E5hKNLNz4xQ7vjlYrDzYC29ImHcp0o6OjSCmQo6xkDSj5OHly7aChH0JxxXcg.
>>> Delegating to response handler for possible retry."
>>>
>>> We're kind of stuck on these questions:
>>> * Is flink capable or doing these retries?
>>> * Does anyone succesfully write their (rocksdb) state to Google Cloud
>>> storage for bigger state sizes?
>>> * Is it possible flink renames or deletes certain directories before all
>>> flushes have been done based on an atomic guarantee provided by HDFS that
>>> does not hold on other implementations perhaps? A race condition of sorts
>>>
>>> Basically does anyone recognize this behavior?
>>>
>>> Regards,
>>>
>>> Richard Deurwaarder
>>>
>>> [1] We use an HDFS implementation provided by Google
>>> https://github.com/GoogleCloudDataproc/bigdata-interop/tree/master/gcs
>>> [2]
>>> https://cloud.google.com/storage/docs/json_api/v1/status-codes#410_Gone
>>> [3]
>>> https://github.com/GoogleCloudDataproc/bigdata-interop/blob/master/gcs/CONFIGURATION.md
>>>  (see
>>> fs.gs.outputstream.upload.chunk.size)
>>> [4] Stacktrace:
>>> https://gist.github.com/Xeli/da4c0af2c49c060139ad01945488e492
>>>
>>


Status of FLINK-12692 (Support disk spilling in HeapKeyedStateBackend)

2020-01-29 Thread Ken Krugler
Hi Yu Li,

It looks like this stalled out a bit, from May of last year, and won’t make it 
into 1.10.

I’m wondering if there’s a version in Blink (as a completely separate state 
backend?) that could be tried out?

Thanks,

— Ken

--
Ken Krugler
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr



Using retained checkpoints as savepoints

2020-01-29 Thread Ken Krugler
Hi all,

Currently 
https://ci.apache.org/projects/flink/flink-docs-master/ops/state/checkpoints.html#difference-to-savepoints
 

 says checkpoints…

"do not support Flink specific features like rescaling"

But I believe they do, and really must if you can use them like a savepoint. 
Should that sentence be changed, or removed?

Also this page doesn’t talk about state migration, which is another aspect of 
restarting a (modified) workflow from a retained checkpoint…will that work?

This sentence about checkpoints on 
https://ci.apache.org/projects/flink/flink-docs-master/ops/state/savepoints.html#what-is-a-savepoint-how-is-a-savepoint-different-from-a-checkpoint
 

 implies not:

"Optimizations towards those goals can exploit certain properties, e.g. that 
the job code doesn’t change between the execution attempts"

Thanks,

— Ken

--
Ken Krugler
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr



Task-manager kubernetes pods take a long time to terminate

2020-01-29 Thread Li Peng
Hey folks, I'm deploying a Flink cluster via kubernetes, and starting each
task manager with taskmanager.sh. I noticed that when I tell kubectl to
delete the deployment, the job-manager pod usually terminates very quickly,
but any task-manager that doesn't get terminated before the job-manager,
usually gets stuck in this loop:

2020-01-29 09:18:47,867 INFO
 org.apache.flink.runtime.taskexecutor.TaskExecutor- Could not
resolve ResourceManager address
akka.tcp://flink@job-manager:6123/user/resourcemanager,
retrying in 1 ms: Could not connect to rpc endpoint under address
akka.tcp://flink@job-manager:6123/user/resourcemanager

It then does this for about 10 minutes(?), and then shuts down. If I'm
deploying a new cluster, this pod will try to register itself with the new
job manager before terminating lter. This isn't a troubling issue as far as
I can tell, but I find it annoying that I sometimes have to force delete
the pods.

Any easy ways to just have the task managers terminate gracefully and
quickly?

Thanks,
Li


FsStateBackend vs RocksDBStateBackend

2020-01-29 Thread Ran Zhang
Hi all,

We have a Flink app that uses a KeyedProcessFunction, and in the function
it requires a ValueState(of TreeSet) and the processElement method needs to
access and update it. We tried to use RocksDB as our stateBackend but the
performance is not good, and intuitively we think it was because of the
serialization / deserialization on each processElement call. Then we tried
to switch to use FsStateBackend (which keeps the in-flight data in the
TaskManager’s memory according to doc), and it could resolve the
performance issue. *So we want to understand better what are the tradeoffs
in choosing between these 2 stateBackend.* Our checkpoint size is 200 - 300
GB in stable state. For now we know one benefits of RocksDB is it supports
incremental checkpoint, but would love to know what else we are losing in
choosing FsStateBackend.

Thanks a lot!
Ran Zhang


Re: FsStateBackend vs RocksDBStateBackend

2020-01-29 Thread Tzu-Li (Gordon) Tai
Hi Ran,

On Thu, Jan 30, 2020 at 9:39 AM Ran Zhang  wrote:

> Hi all,
>
> We have a Flink app that uses a KeyedProcessFunction, and in the function
> it requires a ValueState(of TreeSet) and the processElement method needs to
> access and update it. We tried to use RocksDB as our stateBackend but the
> performance is not good, and intuitively we think it was because of the
> serialization / deserialization on each processElement call.
>

As you have already pointed out, serialization behaviour is a major
difference between the 2 state backends, and will directly impact
performance due to the extra runtime overhead in RocksDB.
If you plan to continue using the RocksDB state backend, make sure to use
MapState instead of ValueState where possible, since every access to the
ValueState in the RocksDB backend requires serializing / deserializing the
whole value.
For MapState, de-/serialization happens per K-V access. Whether or not this
makes sense would of course depend on your state access pattern.


> Then we tried to switch to use FsStateBackend (which keeps the in-flight
> data in the TaskManager’s memory according to doc), and it could resolve
> the performance issue. *So we want to understand better what are the
> tradeoffs in choosing between these 2 stateBackend.* Our checkpoint size
> is 200 - 300 GB in stable state. For now we know one benefits of RocksDB is
> it supports incremental checkpoint, but would love to know what else we are
> losing in choosing FsStateBackend.
>

As of now, feature-wise both backends support asynchronous snapshotting,
state schema evolution, and access via the State Processor API.
In the end, the major factor for deciding between the two state backends
would be your expected state size.
That being said, it could be possible in the future that savepoint formats
for the backends are changed to be compatible, meaning that you will be
able to switch between different backends upon restore [1].


>
> Thanks a lot!
> Ran Zhang
>

Cheers,
Gordon

 [1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-41%3A+Unify+Binary+format+for+Keyed+State


Re: TableSource being duplicated

2020-01-29 Thread Benchao Li
Hi Benoît,

Do you mean if you register one TableSource, and add two sinks from the
same TableSource, the source will duplicate ?
If so, maybe you can check
*TableEnvironmentImpl.isEagerOperationTranslation*, it's *false* by
default. But in *StreamTableEnvironmentImpl*, it's *true* because we need
eager translation to keep alignment with DataStream Api.
If you don't need Table <-> DataStream translation, you can just
use TableEnvironmentImpl instead of StreamTableEnvironmentImpl to achieve
your goal.

Hope it helps.

Benoît Paris  于2020年1月23日周四 上午6:50写道:

> Hello all!
>
> I'm having a problem with TableSources' DataStream being duplicated when
> pulled on from 2 sinks.
>
> I understand that sometimes the best plan might just be to duplicate and
> read both times a TableSource/SourceFunction; but in my case I can't quite
> reproduce the data as say Kafka would. I just need the SourceFunction and
> DataStream provided by the TableSource to not be duplicated.
>
> As a workaround to this issue, I introduce some sort of materialization
> barrier that makes the planner pull only on one instance of the
> TableSource/SourceFunction:
> Instead of:
>
> tEnv.registerTableSource("foo_table", new FooTableSource());
>
> I convert it to an Append Stream, and back again to a Table:
>
> tEnv.registerTableSource("foo_table_source", new FooTableSource());
> Table sourceTable = tEnv.sqlQuery("SELECT * FROM foo_table_source");
> Table appendingSourceTable = tEnv.fromDataStream(
> tEnv.toAppendStream(sourceTable, Types.ROW(new String[]{"field_1"}, 
> new TypeInformation[]{Types.LONG()}))
> );
> tEnv.registerTable("foo_table", appendingSourceTable);
>
> And the conversion to an Append Stream somewhat makes the planner behave
> and there is only one DataSource in the execution plan.
>
> But I'm feeling like I'm just missing a simple option (on the
> SourceFunction, or on the TableSource?) to invoke and declare the Source as
> being non duplicateable.
>
> I have tried a lot of options (uid(), operation chaining restrictions,
> twiddling the transformation, forceNonParallel(), etc.), but can't find
> quite how to do that! My SourceFunction is a RichSourceFunction
>
> At this point I'm wondering if this is a bug, or if it is a feature that
> would have to be implemented.
>
> Cheers,
> Ben
>
>
>
>
>
>

-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


Re: fliter and flatMap operation VS only a flatMap operation

2020-01-29 Thread Tzu-Li (Gordon) Tai
Hi,

If your filter and flatMap operators are chained, then the performance
difference should not be noticeable.
If a shuffle (i.e. a keyBy operation) occurs after the filter and before
the flatMap, then applying the filter first will be more efficient.

Cheers,
Gordon

On Thu, Jan 30, 2020 at 4:03 AM Soheil Pourbafrani 
wrote:

> Hi,
>
> In case we need to filter operation followed by a transformation, which
> one is more efficient in Flink, applying the filter operation first and
> then a flatMap operation separately OR using only a flatMap operation that
> internally includes the filter logic, too?
>
> best
> Soheil
>