Re: Why OnDataStream in flink accepts partial function but DataStream do not

2019-01-31 Thread Dawid Wysakowicz
Hi,

AFAIK it clashes with

def map[R: TypeInformation](mapper: MapFunction[T, R]): DataStream[R]

if you use partially applied function. You can check the discussion
behind the OnDataStream implementation in this jira[1].

[1] https://issues.apache.org/jira/browse/FLINK-1159

On 31/01/2019 04:10, Renkai wrote:
> As shown in
> https://ci.apache.org/projects/flink/flink-docs-master/dev/scala_api_extensions.html
> ,we can use scala partitial function by
>
> import org.apache.flink.streaming.api.scala.extensions._
>
> and replace .map by .mapWith. but the signature of def mapWith[R:
> TypeInformation](fun: T => R): DataStream[R] and def map[R:
> TypeInformation](fun: T => R): DataStream[R] are exactly the same. So how
> the different behavior happens?
>
>
>
>
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



signature.asc
Description: OpenPGP digital signature


Re: Data loss when restoring from savepoint

2019-01-31 Thread Juho Autio
Hello, is there anyone that could help with this?

On Fri, Jan 11, 2019 at 8:14 AM Juho Autio  wrote:

> Stefan, would you have time to comment?
>
> On Wednesday, January 2, 2019, Juho Autio  wrote:
>
>> Bump – does anyone know if Stefan will be available to comment the latest
>> findings? Thanks.
>>
>> On Fri, Dec 21, 2018 at 2:33 PM Juho Autio  wrote:
>>
>>> Stefan, I managed to analyze savepoint with bravo. It seems that the
>>> data that's missing from output *is* found in savepoint.
>>>
>>> I simplified my test case to the following:
>>>
>>> - job 1 has bee running for ~10 days
>>> - savepoint X created & job 1 cancelled
>>> - job 2 started with restore from savepoint X
>>>
>>> Then I waited until the next day so that job 2 has triggered the 24 hour
>>> window.
>>>
>>> Then I analyzed the output & savepoint:
>>>
>>> - compare job 2 output with the output of a batch pyspark script => find
>>> 4223 missing rows
>>> - pick one of the missing rows (say, id Z)
>>> - read savepoint X with bravo, filter for id Z => Z was found in the
>>> savepoint!
>>>
>>> How can it be possible that the value is in state but doesn't end up in
>>> output after state has been restored & window is eventually triggered?
>>>
>>> I also did similar analysis on the previous case where I savepointed &
>>> restored the job multiple times (5) within the same 24-hour window. A
>>> missing id that I drilled down to, was found in all of those savepoints,
>>> yet missing from the output that gets written at the end of the day. This
>>> is even more surprising: that the missing ID was written to the new
>>> savepoints also after restoring. Is the reducer state somehow decoupled
>>> from the window contents?
>>>
>>> Big thanks to bravo-developer Gyula for guiding me through to be able
>>> read the reducer state! https://github.com/king/bravo/pull/11
>>>
>>> Gyula also had an idea for how to troubleshoot the missing data in a
>>> scalable way: I could add some "side effect kafka output" on individual
>>> operators. This should allow tracking more closely at which point the data
>>> gets lost. However, maybe this would have to be in some Flink's internal
>>> components, and I'm not sure which those would be.
>>>
>>> Cheers,
>>> Juho
>>>
>>> On Mon, Nov 19, 2018 at 11:52 AM Juho Autio 
>>> wrote:
>>>

 Hi Stefan,

 Bravo doesn't currently support reading a reducer state. I gave it a
 try but couldn't get to a working implementation yet. If anyone can provide
 some insight on how to make this work, please share at github:
 https://github.com/king/bravo/pull/11

 Thanks.

 On Tue, Oct 23, 2018 at 3:32 PM Juho Autio 
 wrote:

> I was glad to find that bravo had now been updated to support
> installing bravo to a local maven repo.
>
> I was able to load a checkpoint created by my job, thanks to the
> example provided in bravo README, but I'm still missing the essential 
> piece.
>
> My code was:
>
> OperatorStateReader reader = new OperatorStateReader(env2,
> savepoint, "DistinctFunction");
> DontKnowWhatTypeThisIs reducingState =
> reader.readKeyedStates(what should I put here?);
>
> I don't know how to read the values collected from reduce() calls in
> the state. Is there a way to access the reducing state of the window with
> bravo? I'm a bit confused how this works, because when I check with
> debugger, flink internally uses a ReducingStateDescriptor
> with name=window-contents, but still reading operator state for
> "DistinctFunction" didn't at least throw an exception ("window-contents"
> threw – obviously there's no operator by that name).
>
> Cheers,
> Juho
>
> On Mon, Oct 15, 2018 at 2:25 PM Juho Autio 
> wrote:
>
>> Hi Stefan,
>>
>> Sorry but it doesn't seem immediately clear to me what's a good way
>> to use https://github.com/king/bravo.
>>
>> How are people using it? Would you for example modify build.gradle
>> somehow to publish the bravo as a library locally/internally? Or add code
>> directly in the bravo project (locally) and run it from there (using an
>> IDE, for example)? Also it doesn't seem like the bravo gradle project
>> supports building a flink job jar, but if it does, how do I do it?
>>
>> Thanks.
>>
>> On Thu, Oct 4, 2018 at 9:30 PM Juho Autio 
>> wrote:
>>
>>> Good then, I'll try to analyze the savepoints with Bravo. Thanks!
>>>
>>> > How would you assume that backpressure would influence your
>>> updates? Updates to each local state still happen event-by-event, in a
>>> single reader/writing thread.
>>>
>>> Sure, just an ignorant guess by me. I'm not familiar with most of
>>> Flink's internals. Any way high backpressure is not a seen on this job
>>> after it has caught up the lag, so at I thought it would be worth
>>> mentioning.
>>>

Re: Forking a stream with Flink

2019-01-31 Thread Daniel Krenn
I don't get what happened here. Did Selvaraj just hijack this question? Or
what is going on?

Am Di., 29. Jan. 2019 um 17:01 Uhr schrieb Selvaraj chennappan <
selvarajchennap...@gmail.com>:

> I think there is misunderstanding . I want to compare raw json and
> transformed record .
> Hence I need two consumer and merge the stream for comparison.
> I have  pipeline defined . pipeline does source(kafka)
> ,transformation,dedup and persisting to DB .
> [image: image.png]
>
> Before reaching to DB task lots of transformation is applied on the
> pipeline  Therefore want to validate the record with raw json message which
> is available in kafka  with the transformed record.
>
> Hence I want to know How to do that in flink.
>
>
> On Tue, Jan 29, 2019 at 8:54 PM Puneet Kinra <
> puneet.ki...@customercentria.com> wrote:
>
>> Hi Selvaraj
>>
>> In your pojo add data member as status or something like that,now set it
>> error in case it is invaild .pass the output of flatmap
>> to split opertor there you can split the stream
>>
>> On Tue, Jan 29, 2019 at 6:39 PM Selvaraj chennappan <
>> selvarajchennap...@gmail.com> wrote:
>>
>>> UseCase:- We have kafka consumer to read messages(json ) then it applies
>>> to flatmap  for transformation based on the rules ( rules are complex ) and
>>> convert it to pojo .
>>> We want to verify the record(pojo) is valid by checking field by field
>>> of that record .if record is invalid due to transformation rules  then move
>>> to error topic otherwise send to DB.
>>>
>>> I thought of Implementing like adding another consumer to read json
>>> message  and compare json message attributes with transformed record
>>> attributes .
>>>
>>> Hence I need to join/coprocess these two streams to validate then decide
>>> whether persist to db or sending to error topic.
>>>
>>> Please let me know if you need more information.
>>>
>>> On Tue, Jan 29, 2019 at 6:21 PM miki haiat  wrote:
>>>
 Im not sure if i got your question correctly, can you elaborate more on
 your use case

>>>
>>>
>>> --
>>>
>>>
>>>
>>>
>>>
>>> Regards,
>>> Selvaraj C
>>>
>>
>>
>> --
>> *Cheers *
>>
>> *Puneet Kinra*
>>
>> *Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
>> *
>>
>> *e-mail :puneet.ki...@customercentria.com
>> *
>>
>>
>>
>
> --
>
>
>
>
>
> Regards,
> Selvaraj C
>


Re: Forking a stream with Flink

2019-01-31 Thread Dawid Wysakowicz
Hi Daniel,

The answer to you original question is you can just keyBy[1] by e.g. the
machineId and then computations on KeyedStream are applied independently
for each key.

Best,

Dawid

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/#datastream-transformations

On 31/01/2019 12:55, Daniel Krenn wrote:
> I don't get what happened here. Did Selvaraj just hijack this
> question? Or what is going on?
>
> Am Di., 29. Jan. 2019 um 17:01 Uhr schrieb Selvaraj chennappan
> mailto:selvarajchennap...@gmail.com>>:
>
> I think there is misunderstanding . I want to compare raw json and
> transformed record . 
> Hence I need two consumer and merge the stream for comparison.
> I have  pipeline defined . pipeline does source(kafka)
> ,transformation,dedup and persisting to DB .
> image.png
>
> Before reaching to DB task lots of transformation is applied on
> the pipeline  Therefore want to validate the record with raw json
> message which is available in kafka  with the transformed record.
>
> Hence I want to know How to do that in flink. 
>
>
> On Tue, Jan 29, 2019 at 8:54 PM Puneet Kinra
>  > wrote:
>
> Hi Selvaraj
>
> In your pojo add data member as status or something like
> that,now set it error in case it is invaild .pass the output
> of flatmap
> to split opertor there you can split the stream 
>
> On Tue, Jan 29, 2019 at 6:39 PM Selvaraj chennappan
>  > wrote:
>
> UseCase:- We have kafka consumer to read messages(json )
> then it applies to flatmap  for transformation based on
> the rules ( rules are complex ) and convert it to pojo .
> We want to verify the record(pojo) is valid by checking
> field by field of that record .if record is invalid due to
> transformation rules  then move to error topic otherwise
> send to DB.
>
> I thought of Implementing like adding another consumer to
> read json message  and compare json message attributes
> with transformed record attributes . 
>
> Hence I need to join/coprocess these two streams to
> validate then decide whether persist to db or sending to
> error topic.
>
> Please let me know if you need more information.
>
> On Tue, Jan 29, 2019 at 6:21 PM miki haiat
> mailto:miko5...@gmail.com>> wrote:
>
> Im not sure if i got your question correctly, can you
> elaborate more on your use case 
>
>
>
> -- 
>
>
>
>
>
> Regards,
> Selvaraj C
>
>
>
> -- 
> *Cheers *
> *
> *
> *Puneet Kinra*
> *
> *
>
> *Mobile:+918800167808 | Skype :
> puneet.ki...@customercentria.com
> *
>
> *e-mail :puneet.ki...@customercentria.com
> *
>
>
>
>
> -- 
>
>
>
>
>
> Regards,
> Selvaraj C
>


signature.asc
Description: OpenPGP digital signature


Continuous Monitoring of back-pressure

2019-01-31 Thread David Corley
I'm currently writing some code to convert the back-pressure REST API data
into Prometheus-compatible output. I was just curious why back-pressure
wasn't already exposed as a metric in the in-built Prometheus exporter? Is
it because the thread-sampling is too intensive? Or too slow (particularly
if running multiple jobs)? In our case we're running a single job per
cluster. Any feedback would be appreciated.
Regards,
Dave


Re: Writing a custom Rocksdb statistics collector

2019-01-31 Thread Harshvardhan Agrawal
It looks like the DBOptions that are created by the OptionsFactory class
are used for opening RocksDB.

And yes I missed the fact that DBOptions is not serializable. Thanks for
pointing that out. I will go through the metrics exposed via Flink. But
does this mean that there no good way of getting native RocksDB metrics in
Flink?

On Wed, Jan 30, 2019 at 23:07 Yun Tang  wrote:

> Hi Harshvardhan
>
> First of all, 'DBOptions' is not serializable, I think you cannot include
> it in the source constructor.
>
> I also wondering whether the given `DBOptions` could query RocksDB's
> statistics since they are not the actual options to open RocksDB.
>
> We have tried to report RocksDB's statistics each time when RocksDB
> state-backend snapshots, but this solution means you have to modify RocksDB
> state-backend's source code. By the way, Flink supports to report some
> native metrics[1], hope this could be helpful.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/config.html#rocksdb-native-metrics
> 
>
> Best
> Yun Tang
> --
> *From:* Harshvardhan Agrawal 
> *Sent:* Thursday, January 31, 2019 0:23
> *To:* user
> *Subject:* Writing a custom Rocksdb statistics collector
>
>
> Hi,
>
> I am currently trying to integrate RocksDB statistics in my pipeline.
>
> The basic idea is that we want to pass RocksDB stats through the same 
> pipeline that is doing our processing and write them to Elasticsearch so that 
> we can visualize them in Kibana.
>
> I have written a custom source function that takes in the DBOptions object 
> from the stream environment and supply it to the source function which then 
> uses this dboptions object to continuously query Rocksdb for metrics. Here's 
> the code:
>
> public class RocksDBStatsStreamRunner {
>
>
> public static void main(String[] args) throws IOException {
>
> final StreamExecutionEnvironment streamEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> RocksDBStateBackend rocksDBStateBackend = new 
> RocksDBStateBackend("/tmp",true);
> rocksDBStateBackend.setOptions(new MyOptionsFactory());
> streamEnv.setStateBackend(rocksDBStateBackend);
>
> DBOptions dbOptions = 
> ((RocksDBStateBackend)streamEnv.getStateBackend()).getDbOptions();
> streamEnv.addSource(new RocksDBStatisticsSource(dbOptions));
> }
> }
>
>
> public RocksDBStatisticsSource(DBOptions dbOptions) {
> this(dbOptions, DEFAULT_SLEEP_TIME_MS);
> }
>
> public RocksDBStatisticsSource(DBOptions dbOptions, long waitTimeMs) {
> this.dbOptions = dbOptions;
> this.waitTimeMs = waitTimeMs;
> }
>
>
> @Override
> public void stop() {
> this.isRunning = false;
> }
>
> @Override
> public void run(SourceContext sourceContext) throws Exception {
> while(isRunning) {
> //create rocksdb statistics object
> //query rocksdb for statistics using the options field
> //sourceContext.collect(rocksdbStats object)
> //sleep
> }
> }
>
> @Override
> public void cancel() {
> this.isRunning = false;
> }
>
> I am assuming that we will get a separate RocksDB options object for each
> of the slots. Is this a good way to approach this problem? Do you think
> this will work?
>
> Thanks in advance! :)
> --
>
> *Regards, Harshvardhan Agrawal*
>
-- 
Regards,
Harshvardhan


how to use Hadoop Inputformats with flink shaded s3?

2019-01-31 Thread Cliff Resnick
I need to process some Parquet data from S3 as a unioned input in my
DataStream pipeline. From what I know, this requires using the
hadoop AvroParquetInputFormat.  The problem I'm running into is that also
requires using un-shaded hadoop classes that conflict with the Flink shaded
hadoop3 FileSystem. The pipleline otherwise runs fine with the shaded fs.

Can anyone successfully read parquet data using the Flink shaded s3 fs? If
so can you please clue me in?


Re: No resource available error while testing HA

2019-01-31 Thread Averell
Hi Gary,

I faced a similar problem yesterday, but don't know what was the cause yet.
The situation that I observed is as follow:
 - At about 2:57, one of my EMR execution node (IP ...99) got disconnected
from YARN resource manager (on RM I could not see that node anymore),
despite that the node was still running. <<< This is another issue, but I
believe it is with YARN.
 - About 8 hours after that (between 10:00 - 11:00), I turned the
problematic EMR core node off. AWS spun up another node and added it to the
cluster to replace that. YARN RM soon recognized the new node and added it
to its list of available nodes.
However, the JM seemed to not (able to) do anything after that. It kept
trying to start the job, failed after the timeout and that "no resource
available" exception again and again. No jobmanager logs recorded since
2:57:15 though.

I am attaching the logs collected via "yarn logs --applicationId 
here. But it seems I still missed something.

I am using Flink 1.7.1, with yarn-site configuration
yarn.resourcemanager.am.max-attempts=5. Flink configurations are all of the
default values.

Thanks and best regards,
Averell flink.log

  



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Videos and slides on Flink Forward Beijing

2019-01-31 Thread Paul Lam
Hi,

It’s been a while since Flink Forward Beijing, would the videos and slides be 
available on the website? Thanks!

Best,
Paul Lam



Get UnknownTaskExecutorException when I add a new configuration in flink-conf.yaml

2019-01-31 Thread yinhua.dai
Hi Community,I added below item in flink-conf.yaml, and I saw
UnknownTaskExecutorException each time when I start flink in Windows via
start-cluster.bat.*fs.s3a.aws.credentials.provider:
com.tr.apt.sqlengine.tables.aws.HadoopAWSCredentialsProviderChain*I'm sure
that this new configuration is successfully read by flink and the class in
already put in the class path, and there is no error if I remove this
line.I'm using flink 1.6.02019-02-01 15:40:57,148 ERROR
org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler 
- Implementation error: Unhandled
exception.org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException:
No TaskExecutor registered under d6f5b0f7dd86432ffb515edea31d4f01.  at
org.apache.flink.runtime.resourcemanager.ResourceManager.requestTaskManagerInfo(ResourceManager.java:558)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)  at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498) at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
at
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)at
akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)at
akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)at
akka.actor.ActorCell.invoke(ActorCell.scala:495)at
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at
akka.dispatch.Mailbox.run(Mailbox.scala:224)at
akka.dispatch.Mailbox.exec(Mailbox.scala:234)   at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/