Re: How to join stream and batch data in Flink?

2018-09-21 Thread Hequn Cheng
Hi

+1 for vino's answer.
Also, this kind of join will be supported in FLINK-9712
. You can check more
details in the jira.

Best, Hequn

On Fri, Sep 21, 2018 at 4:51 PM vino yang  wrote:

> Hi Henry,
>
> There are three ways I can think of:
>
> 1) use DataStream API, implement a flatmap UDF to access dimension table;
> 2) use table/sql API, implement a UDTF to access dimension table;
> 3) customize the table/sql join API/statement's implementation (and change
> the physical plan)
>
> Thanks, vino.
>
> 徐涛  于2018年9月21日周五 下午4:43写道:
>
>> Hi All,
>> Sometimes some “dimension table” need to be joined from the "fact
>> table", if data are not joined before sent to Kafka.
>> So if the data are joined in Flink, does the “dimension table”
>> have to be import as a stream, or there are some other ways can achieve it?
>> Thanks a lot!
>>
>> Best
>> Henry
>
>


Flink 1.5.4 -- issues w/ TaskManager connecting to ResourceManager

2018-09-21 Thread Jamie Grier
Anybody else seen this?  I'm running both the JM and TM on the same host in
this setup.  This was working fine w/ Flink 1.5.3.

On the TaskManager:

00:31:30.268 INFO  o.a.f.r.t.TaskExecutor - Could not resolve
ResourceManager address akka.tcp://flink@localhost:6123/user/resourcemanager,
retrying in 1 ms: Ask timed out on
[ActorSelection[Anchor(akka.tcp://flink@localhost:6123/),
Path(/user/resourcemanager)]] after [1 ms]. Sender[null] sent message
of type "akka.actor.Identify"..

On the JobManager:

00:32:00.339 ERROR a.r.EndpointWriter - dropping message [class
akka.actor.ActorSelectionMessage] for non-local recipient
[Actor[akka.tcp://flink@localhost:6123/]] arriving at
[akka.tcp://flink@localhost:6123] inbound addresses are
[akka.tcp://flink@cluster:6123]


Re: Running Flink in Google Cloud Platform (GCP) - can Flink be truly elastic?

2018-09-21 Thread Dawid Wysakowicz
Hi Alexander,

I've redirected your question to user mailing list. The goal of
community list is for "Broader community discussions related to meetups,
conferences, blog posts and job offers"

Quick answer to your question is that dynamic scaling of flink job's is
a work in progress. Maybe Gary or Till cc'ed can share some more details
on that topic.

Best,

Dawid


On 21/09/18 17:25, alexander.gard...@rbs.com.INVALID wrote:
> Hi
>
> I'm trying to understand what it means to run a Flink cluster inside the 
> Google Cloud Platform and whether it can act in an "elastic" way; if the 
> cluster needs more resources to accommodate a sudden demand or increase in 
> Flink jobs, will GCP automatically detect this and spool up more Task 
> Managers to provide extra task slots?
>
> If we consider the following two simple use cases, how would GCP address them?
>
>
> 1) No free task slots to run new flink jobs
>
> 2) A slow flink job needs an increased parallelism to improve throughput
>
> Currently, we'd handle the above use cases by:
>
>
> 1) knowing that the job failed due to "no free slots", check the 
> exception text, schedule to add a new task manager and rerun the job, knowing 
> that there are now available task slots.
>
> 2) We'd monitor the speed of the job ourselves, stop the job, specify 
> which components (operators) in the stream reqd an increase in parallelism 
> (for example via job properties), then relaunch the job; if not enough slots 
> were available, we'd have to consider adding extra task managers.
>
>
> So my question is...can Google Cloud Platform (GCP) automatically launch 
> extra TMs to handle the above?
>
> If we proposed to run a Flink cluster in a GCP container, can GCP make Flink 
> behave dynamically elastic in the same way that Google DataFlow apparently 
> can?
>
> Regards
>
>
> Alex
>
>
> The Royal Bank of Scotland plc. Registered in Scotland No 83026. Registered 
> Office: 36 St Andrew Square, Edinburgh EH2 2YB. The Royal Bank of Scotland is 
> authorised by the Prudential Regulation Authority, and regulated by the 
> Financial Conduct Authority and Prudential Regulation Authority. The Royal 
> Bank of Scotland N.V. is authorised and regulated by the De Nederlandsche 
> Bank and has its seat at Amsterdam, the Netherlands, and is registered in the 
> Commercial Register under number 33002587. Registered Office: Gustav 
> Mahlerlaan 350, Amsterdam, The Netherlands. The Royal Bank of Scotland N.V. 
> and The Royal Bank of Scotland plc are authorised to act as agent for each 
> other in certain jurisdictions.
>
> National Westminster Bank Plc.  Registered in England No. 929027.  Registered 
> Office: 135 Bishopsgate, London EC2M 3UR.  National Westminster Bank Plc is 
> authorised by the Prudential Regulation Authority, and regulated by the 
> Financial Conduct Authority and the Prudential Regulation Authority.
>
> The Royal Bank of Scotland plc and National Westminster Bank Plc are 
> authorised to act as agent for each other.
>
> This e-mail message is confidential and for use by the addressee only.  If 
> the message is received by anyone other than the addressee, please return the 
> message to the sender by replying to it and then delete the message from your 
> computer.  Internet e-mails are not necessarily secure.  The Royal Bank of 
> Scotland plc, The Royal Bank of Scotland N.V., National Westminster Bank Plc 
> or any affiliated entity (RBS or us) does not accept responsibility for 
> changes made to this message after it was sent.  RBS may monitor e-mails for 
> business and operational purposes.  By replying to this message you 
> understand that the content of your message may be monitored. 
>
> Whilst all reasonable care has been taken to avoid the transmission of 
> viruses, it is the responsibility of the recipient to ensure that the onward 
> transmission, opening or use of this message and any attachments will not 
> adversely affect its systems or data.  No responsibility is accepted by RBS 
> in this regard and the recipient should carry out such virus and other checks 
> as it considers appropriate.
>
> Visit our website at www.rbs.com 
>




signature.asc
Description: OpenPGP digital signature


Re: How does flink read data from kafka number of TM's are more than topic partitions

2018-09-21 Thread Piotr Nowojski
No problem :)

Piotrek

> On 21 Sep 2018, at 15:04, Taher Koitawala  wrote:
> 
> Thanks a lot for the explanation. That was exactly what I thought should 
> happen. However, it is always good to a clear confirmation.
> 
> 
> Regards,
> Taher Koitawala
> GS Lab Pune
> +91 8407979163
> 
> 
> On Fri, Sep 21, 2018 at 6:26 PM Piotr Nowojski  > wrote:
> Hi,
> 
> Yes, in your case half of the Kafka source tasks wouldn’t read/process any 
> records (you can check that in web UI). This shouldn’t harm you, unless your 
> records will be redistributed after the source. For example:
> 
> source.keyBy(..).process(new MyVeryHeavyOperator()).print()
> 
> Should be fine, because `keyBy(…)` will redistribute records. However
> 
> source.map(new MyVeryHeavyOperator()).print()
> 
> Will mean that half of `MyVeryHeavyOperator`s will be idling as well. To 
> solve that, you might want to consider using 
> 
> dataStream.rebalance();
> 
> Piotrek
> 
>> On 21 Sep 2018, at 13:25, Taher Koitawala > > wrote:
>> 
>> Hi All,
>>  Let's say a topic in kafka has 5 partitions. If I spawn 10 Task 
>> Managers with 1 slot each and parallelism is 10 then how will records be 
>> read from the kafka topic if I use the FlinkKafkaConsumer to read.
>> 
>> Will 5 TM's read and the rest be ideal in that case? Is over subscribing the 
>> number of TM's than the number of partitions in the Kafka topic guarantee 
>> high throughput?
>>  
>> Regards,
>> Taher Koitawala
>> GS Lab Pune
>> +91 8407979163
> 



Re: How does flink read data from kafka number of TM's are more than topic partitions

2018-09-21 Thread Taher Koitawala
Thanks a lot for the explanation. That was exactly what I thought should
happen. However, it is always good to a clear confirmation.


Regards,
Taher Koitawala
GS Lab Pune
+91 8407979163


On Fri, Sep 21, 2018 at 6:26 PM Piotr Nowojski 
wrote:

> Hi,
>
> Yes, in your case half of the Kafka source tasks wouldn’t read/process any
> records (you can check that in web UI). This shouldn’t harm you, unless
> your records will be redistributed after the source. For example:
>
> source.keyBy(..).process(new MyVeryHeavyOperator()).print()
>
> Should be fine, because `keyBy(…)` will redistribute records. However
>
> source.map(new MyVeryHeavyOperator()).print()
>
> Will mean that half of `MyVeryHeavyOperator`s will be idling as well. To
> solve that, you might want to consider using
>
> dataStream.rebalance();
>
> Piotrek
>
> On 21 Sep 2018, at 13:25, Taher Koitawala 
> wrote:
>
> Hi All,
>  Let's say a topic in kafka has 5 partitions. If I spawn 10 Task
> Managers with 1 slot each and parallelism is 10 then how will records be
> read from the kafka topic if I use the FlinkKafkaConsumer to read.
>
> Will 5 TM's read and the rest be ideal in that case? Is over subscribing
> the number of TM's than the number of partitions in the Kafka topic
> guarantee high throughput?
>
> Regards,
> Taher Koitawala
> GS Lab Pune
> +91 8407979163
>
>
>


Re: How does flink read data from kafka number of TM's are more than topic partitions

2018-09-21 Thread Piotr Nowojski
Hi,

Yes, in your case half of the Kafka source tasks wouldn’t read/process any 
records (you can check that in web UI). This shouldn’t harm you, unless your 
records will be redistributed after the source. For example:

source.keyBy(..).process(new MyVeryHeavyOperator()).print()

Should be fine, because `keyBy(…)` will redistribute records. However

source.map(new MyVeryHeavyOperator()).print()

Will mean that half of `MyVeryHeavyOperator`s will be idling as well. To solve 
that, you might want to consider using 

dataStream.rebalance();

Piotrek

> On 21 Sep 2018, at 13:25, Taher Koitawala  wrote:
> 
> Hi All,
>  Let's say a topic in kafka has 5 partitions. If I spawn 10 Task 
> Managers with 1 slot each and parallelism is 10 then how will records be read 
> from the kafka topic if I use the FlinkKafkaConsumer to read.
> 
> Will 5 TM's read and the rest be ideal in that case? Is over subscribing the 
> number of TM's than the number of partitions in the Kafka topic guarantee 
> high throughput?
>  
> Regards,
> Taher Koitawala
> GS Lab Pune
> +91 8407979163



Re: Data loss when restoring from savepoint

2018-09-21 Thread Juho Autio
Thanks, Andrey.

> so it means that the savepoint does not loose at least some dropped
records.

I'm not sure what you mean by that? I mean, it was known from the
beginning, that not everything is lost before/after restoring a savepoint,
just some records around the time of restoration. It's not 100% clear
whether records are lost before making a savepoint or after restoring it.
Although, based on the new DEBUG logs it seems more like losing some
records that are seen ~soon after restoring. It seems like Flink would be
somehow confused either about the restored state vs. new inserts to state.
This could also be somehow linked to the high back pressure on the kafka
source while the stream is catching up.

> If it is feasible for your setup, I suggest to insert one more map
function after reduce and before sink.
> etc.

Isn't that the same thing that we discussed before? Nothing is sent to
BucketingSink before the window closes, so I don't see how it would make
any difference if we replace the BucketingSink with a map function or
another sink type. We don't create or restore savepoints during the time
when BucketingSink gets input or has open buckets – that happens at a much
later time of day. I would focus on figuring out why the records are lost
while the window is open. But I don't know how to do that. Would you have
any additional suggestions?

On Fri, Sep 21, 2018 at 3:30 PM Andrey Zagrebin 
wrote:

> Hi Juho,
>
> so it means that the savepoint does not loose at least some dropped
> records.
>
> If it is feasible for your setup, I suggest to insert one more map
> function after reduce and before sink.
> The map function should be called right after window is triggered but
> before flushing to s3.
> The result of reduce (deduped record) could be logged there.
> This should allow to check whether the processed distinct records were
> buffered in the state after the restoration from the savepoint or not. If
> they were buffered we should see that there was an attempt to write them to
> the sink from the state.
>
> Another suggestion is to try to write records to some other sink or to
> both.
> E.g. if you can access file system of workers, maybe just into local files
> and check whether the records are also dropped there.
>
> Best,
> Andrey
>
> On 20 Sep 2018, at 15:37, Juho Autio  wrote:
>
> Hi Andrey!
>
> I was finally able to gather the DEBUG logs that you suggested. In short,
> the reducer logged that it processed at least some of the ids that were
> missing from the output.
>
> "At least some", because I didn't have the job running with DEBUG logs for
> the full 24-hour window period. So I was only able to look up if I can find
> *some* of the missing ids in the DEBUG logs. Which I did indeed.
>
> I changed the DistinctFunction.java to do this:
>
> @Override
> public Map reduce(Map value1,
> Map value2) {
> LOG.debug("DistinctFunction.reduce returns: {}={}",
> value1.get("field"), value1.get("id"));
> return value1;
> }
>
> Then:
>
> vi flink-1.6.0/conf/log4j.properties
> log4j.logger.org.apache.flink.streaming.runtime.tasks.StreamTask=DEBUG
> log4j.logger.com.rovio.ds.flink.uniqueid.DistinctFunction=DEBUG
>
> Then I ran the following kind of test:
>
> - Cancelled the on-going job with savepoint created at ~Sep 18 08:35 UTC
> 2018
> - Started a new cluster & job with DEBUG enabled at ~09:13, restored from
> that previous cluster's savepoint
> - Ran until caught up offsets
> - Cancelled the job with a new savepoint
> - Started a new job _without_ DEBUG, which restored the new savepoint, let
> it keep running so that it will eventually write the output
>
> Then on the next day, after results had been flushed when the 24-hour
> window closed, I compared the results again with a batch version's output.
> And found some missing ids as usual.
>
> I drilled down to one specific missing id (I'm replacing the actual value
> with AN12345 below), which was not found in the stream output, but was
> found in batch output & flink DEBUG logs.
>
> Related to that id, I gathered the following information:
>
> 2018-09-18~09:13:21,000 job started & savepoint is restored
>
> 2018-09-18 09:14:29,085 missing id is processed for the first time, proved
> by this log line:
> 2018-09-18 09:14:29,085 DEBUG
> com.rovio.ds.flink.uniqueid.DistinctFunction  -
> DistinctFunction.reduce returns: s.aid1=AN12345
>
> 2018-09-18 09:15:14,264 first synchronous part of checkpoint
> 2018-09-18 09:15:16,544 first asynchronous part of checkpoint
>
> (
> more occurrences of checkpoints (~1 min checkpointing time + ~1 min delay
> before next)
> /
> more occurrences of DistinctFunction.reduce
> )
>
> 2018-09-18 09:23:45,053 missing id is processed for the last time
>
> 2018-09-18~10:20:00,000 savepoint created & job cancelled
>
> To be noted, there was high backpressure after restoring from savepoint
> until the stream caught up with the kafka offsets. Although, our job uses
> assign timestamps & 

Re: Flink can't initialize operator state backend when starting from checkpoint

2018-09-21 Thread Marvin777
Hi,

I do not use custom serializers,  and my job contains only source and
sink(BucketingSink).  What causes this phenomenon in general?

I suggest that you also update to a newer version, at least the latest
> bugfix release


Which version does this sentence refer to?  And could you please help list
the issue about this topic?

Thanks a lot.



Stefan Richter  于2018年9月21日周五 下午4:48写道:

> Hi,
>
> that is correct. If you are using custom serializers you should double
> check their correctness, maybe using our test base for type serializers.
> Another reason could be that you modified the job in a way that silently
> changed the schema somehow. Concurrent use of serializers across different
> threads can also cause problems like this and I think there was a bug in
> 1.4 around this topic. I suggest that you also update to a newer version,
> at least the latest bugfix release.
>
> Best,
> Stefan
>
> Am 21.09.2018 um 10:26 schrieb vino yang :
>
> Hi Qingxiang,
>
> Several days ago, Stefan described the causes of this anomaly in a problem
> similar to this:
> Typically, these problems have been observed when something was wrong with
> a serializer or a stateful serializer was used from multiple threads.
>
> Thanks, vino.
>
> Marvin777  于2018年9月21日周五 下午3:20写道:
>
>> Hi all,
>>
>> When Flink(1.4.2) job starts, it could find checkpoint files at HDFS, but
>> exception occurs during deserializing:
>>
>> 
>>
>> Do you have any insight on this?
>>
>> Thanks,
>> Qingxiang Ma
>>
>
>


Re: Data loss when restoring from savepoint

2018-09-21 Thread Andrey Zagrebin
Hi Juho,

so it means that the savepoint does not loose at least some dropped records.

If it is feasible for your setup, I suggest to insert one more map function 
after reduce and before sink. 
The map function should be called right after window is triggered but before 
flushing to s3.
The result of reduce (deduped record) could be logged there.
This should allow to check whether the processed distinct records were buffered 
in the state after the restoration from the savepoint or not. If they were 
buffered we should see that there was an attempt to write them to the sink from 
the state.

Another suggestion is to try to write records to some other sink or to both. 
E.g. if you can access file system of workers, maybe just into local files and 
check whether the records are also dropped there.

Best,
Andrey

> On 20 Sep 2018, at 15:37, Juho Autio  wrote:
> 
> Hi Andrey!
> 
> I was finally able to gather the DEBUG logs that you suggested. In short, the 
> reducer logged that it processed at least some of the ids that were missing 
> from the output.
> 
> "At least some", because I didn't have the job running with DEBUG logs for 
> the full 24-hour window period. So I was only able to look up if I can find 
> some of the missing ids in the DEBUG logs. Which I did indeed.
> 
> I changed the DistinctFunction.java to do this:
> 
> @Override
> public Map reduce(Map value1, Map String> value2) {
> LOG.debug("DistinctFunction.reduce returns: {}={}", 
> value1.get("field"), value1.get("id"));
> return value1;
> }
> 
> Then:
> 
> vi flink-1.6.0/conf/log4j.properties
> log4j.logger.org.apache.flink.streaming.runtime.tasks.StreamTask=DEBUG
> log4j.logger.com.rovio.ds.flink.uniqueid.DistinctFunction=DEBUG
> 
> Then I ran the following kind of test:
> 
> - Cancelled the on-going job with savepoint created at ~Sep 18 08:35 UTC 2018
> - Started a new cluster & job with DEBUG enabled at ~09:13, restored from 
> that previous cluster's savepoint
> - Ran until caught up offsets
> - Cancelled the job with a new savepoint
> - Started a new job _without_ DEBUG, which restored the new savepoint, let it 
> keep running so that it will eventually write the output
> 
> Then on the next day, after results had been flushed when the 24-hour window 
> closed, I compared the results again with a batch version's output. And found 
> some missing ids as usual.
> 
> I drilled down to one specific missing id (I'm replacing the actual value 
> with AN12345 below), which was not found in the stream output, but was found 
> in batch output & flink DEBUG logs.
> 
> Related to that id, I gathered the following information:
> 
> 2018-09-18~09:13:21,000 job started & savepoint is restored
> 
> 2018-09-18 09:14:29,085 missing id is processed for the first time, proved by 
> this log line:
> 2018-09-18 09:14:29,085 DEBUG com.rovio.ds.flink.uniqueid.DistinctFunction
>   - DistinctFunction.reduce returns: s.aid1=AN12345
> 
> 2018-09-18 09:15:14,264 first synchronous part of checkpoint
> 2018-09-18 09:15:16,544 first asynchronous part of checkpoint
> 
> (
>   more occurrences of checkpoints (~1 min checkpointing time + ~1 min 
> delay before next)
>   /
>   more occurrences of DistinctFunction.reduce
> )
> 
> 2018-09-18 09:23:45,053 missing id is processed for the last time
> 
> 2018-09-18~10:20:00,000 savepoint created & job cancelled
> 
> To be noted, there was high backpressure after restoring from savepoint until 
> the stream caught up with the kafka offsets. Although, our job uses assign 
> timestamps & watermarks on the flink kafka consumer itself, so event time of 
> all partitions is synchronized. As expected, we don't get any late data in 
> the late data side output.
> 
> From this we can see that the missing ids are processed by the reducer, but 
> they must get lost somewhere before the 24-hour window is triggered.
> 
> I think it's worth mentioning once more that the stream doesn't miss any ids 
> if we let it's running without interruptions / state restoring.
> 
> What's next?
> 
> On Wed, Aug 29, 2018 at 3:49 PM Andrey Zagrebin  > wrote:
> Hi Juho,
> 
> > only when the 24-hour window triggers, BucketingSink gets a burst of input
> 
> This is of course totally true, my understanding is the same. We cannot 
> exclude problem there for sure, just savepoints are used a lot w/o problem 
> reports and BucketingSink is known to be problematic with s3. That is why, I 
> asked you:
> 
> > You also wrote that the timestamps of lost event are 'probably' around the 
> > time of the savepoint, if it is not yet for sure I would also check it.
> 
> Although, bucketing sink might loose any data at the end of the day (also 
> from the middle). The fact, that it is always around the time of taking a 
> savepoint and not random, is surely suspicious and possible savepoint 
> failures need to be investigated.
> 
> Regarding the s3 problem, s3 doc says:
> 
> > The 

Re: ***UNCHECKED*** Error while confirming Checkpoint

2018-09-21 Thread Stefan Richter
Hi,

could you provide some logs for this problematic job because I would like to 
double check the reason why this violated precondition did actually happen?

Thanks,
Stefan

> Am 20.09.2018 um 17:24 schrieb Stefan Richter :
> 
> FYI, here a link to my PR: https://github.com/apache/flink/pull/6723
> 
>> Am 20.09.2018 um 14:52 schrieb Stefan Richter :
>> 
>> Hi,
>> 
>> I think the failing precondition is too strict because sometimes a 
>> checkpoint can overtake another checkpoint and in that case the commit is 
>> already subsumed. I will open a Jira and PR with a fix.
>> 
>> Best,
>> Stefan
>> 
>>> Am 19.09.2018 um 10:04 schrieb PedroMrChaves :
>>> 
>>> Hello,
>>> 
>>> I have a running Flink job that reads data form one Kafka topic, applies
>>> some transformations and writes data back into another Kafka topic. The job
>>> sometimes restarts due to the following error:
>>> 
>>> /java.lang.RuntimeException: Error while confirming checkpoint
>>>  at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1260)
>>>  at
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>  at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>  at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>  at java.lang.Thread.run(Thread.java:748)
>>> Caused by: java.lang.IllegalStateException: checkpoint completed, but no
>>> transaction pending
>>>  at
>>> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
>>>  at
>>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:258)
>>>  at
>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyOfCompletedCheckpoint(AbstractUdfStreamOperator.java:130)
>>>  at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:650)
>>>  at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1255)
>>>  ... 5 more
>>> 2018-09-18 22:00:10,716 INFO 
>>> org.apache.flink.runtime.executiongraph.ExecutionGraph- Could not
>>> restart the job Alert_Correlation (3c60b8670c81a629716bb2e42334edea) because
>>> the restart strategy prevented it.
>>> java.lang.RuntimeException: Error while confirming checkpoint
>>>  at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1260)
>>>  at
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>  at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>  at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>  at java.lang.Thread.run(Thread.java:748)
>>> Caused by: java.lang.IllegalStateException: checkpoint completed, but no
>>> transaction pending
>>>  at
>>> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
>>>  at
>>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:258)
>>>  at
>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyOfCompletedCheckpoint(AbstractUdfStreamOperator.java:130)
>>>  at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:650)
>>>  at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1255)
>>>  ... 5 more/
>>> 
>>> My state is very small for this particular job, just a few KBs.
>>> 
>>> 
>>>  
>>> 
>>> 
>>> Flink Version: 1.4.2
>>> State Backend: hadoop 2.8
>>> 
>>> Regards,
>>> Pedro Chaves
>>> 
>>> 
>>> 
>>> -
>>> Best Regards,
>>> Pedro Chaves
>>> --
>>> Sent from: 
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>> 
> 



Re: HA failing for 1.6.0 job cluster with docker-compose

2018-09-21 Thread Tzanko Matev
Hi Vino and TIll,

That's great news. Thank you!

Cheers,
Tzanko



On Thu, Sep 20, 2018 at 11:43 AM vino yang  wrote:

> Hi all,
>
> Oh, I took this ticket, will fix it as soon as possible.
>
> Thanks, vino.
>
> Till Rohrmann  于2018年9月20日周四 下午4:35写道:
>
>> Hi Tzanko,
>>
>> in order to make the container entrypoint properly work with HA, we need
>> to fix the JobID (see https://issues.apache.org/jira/browse/FLINK-10291).
>> At the moment, we generate a new JobID for every restart of the cluster
>> entrypoint container. Due to that the system cannot find the existing
>> checkpoints.
>>
>> Fixing the JobID is not a big deal and it should be fixed with the next
>> bug fix release.
>>
>> Cheers,
>> Till
>>
>> On Thu, Sep 20, 2018 at 10:12 AM vino yang  wrote:
>>
>>> Hi Tzanko,
>>>
>>> Maybe Till is more appropriate to answer this question.
>>>
>>> Thanks, vino.
>>>
>>> Tzanko Matev  于2018年9月19日周三 下午5:47写道:
>>>
 Dear all,

 I am currently experimenting with a Flink 1.6.0 job cluster. The goal
 is to run a streaming job on K8s. Right now I am using docker-compose to
 experiment with the job cluster.

 I am trying to set-up HA with Zookeeper, but I seem to fail. I have a
 docker-compose file which contains the following services:
 - Zookeeper
 - Flink job manager
 - Flink task manager

 The containers are set up as per the documentation for docker-compose,
 but I have also set up the necessary HA settings in the conf file. However,
 when I kill the job manager container and start it again, the job being
 processed does not recover but always starts from scratch. Instead I get
 the following error:

 > ERROR org.apache.flink.runtime.rest.handler.job.JobDetailsHandler  -
 Could not retrieve the redirect address.
 >
 > java.util.concurrent.CompletionException:
 org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing
 token not set: Ignoring message
 LocalFencedMessage(8c4887f5c13f6d907d82a55d97ac428f,
 LocalRpcInvocation(requestRestAddress(Time))) sent to
 akka.tcp://flink@blockprocessor-job-cluster:5/user/dispatcher
 because the fencing token is null.

 Am I missing something? Is HA implemented for job clusters at all?

 Best wishes,
 Tzanko Matev




How does flink read data from kafka number of TM's are more than topic partitions

2018-09-21 Thread Taher Koitawala
Hi All,
 Let's say a topic in kafka has 5 partitions. If I spawn 10 Task
Managers with 1 slot each and parallelism is 10 then how will records be
read from the kafka topic if I use the FlinkKafkaConsumer to read.

Will 5 TM's read and the rest be ideal in that case? Is over subscribing
the number of TM's than the number of partitions in the Kafka topic
guarantee high throughput?

Regards,
Taher Koitawala
GS Lab Pune
+91 8407979163


Re: How to join stream and batch data in Flink?

2018-09-21 Thread vino yang
Hi Henry,

There are three ways I can think of:

1) use DataStream API, implement a flatmap UDF to access dimension table;
2) use table/sql API, implement a UDTF to access dimension table;
3) customize the table/sql join API/statement's implementation (and change
the physical plan)

Thanks, vino.

徐涛  于2018年9月21日周五 下午4:43写道:

> Hi All,
> Sometimes some “dimension table” need to be joined from the "fact
> table", if data are not joined before sent to Kafka.
> So if the data are joined in Flink, does the “dimension table”
> have to be import as a stream, or there are some other ways can achieve it?
> Thanks a lot!
>
> Best
> Henry


Re: Flink can't initialize operator state backend when starting from checkpoint

2018-09-21 Thread Stefan Richter
Hi,

that is correct. If you are using custom serializers you should double check 
their correctness, maybe using our test base for type serializers. Another 
reason could be that you modified the job in a way that silently changed the 
schema somehow. Concurrent use of serializers across different threads can also 
cause problems like this and I think there was a bug in 1.4 around this topic. 
I suggest that you also update to a newer version, at least the latest bugfix 
release.

Best,
Stefan

> Am 21.09.2018 um 10:26 schrieb vino yang :
> 
> Hi Qingxiang,
> 
> Several days ago, Stefan described the causes of this anomaly in a problem 
> similar to this:
> Typically, these problems have been observed when something was wrong with a 
> serializer or a stateful serializer was used from multiple threads.
> 
> Thanks, vino.
> 
> Marvin777 mailto:xymaqingxiang...@gmail.com>> 
> 于2018年9月21日周五 下午3:20写道:
> Hi all,
> 
> When Flink(1.4.2) job starts, it could find checkpoint files at HDFS, but 
> exception occurs during deserializing:
> 
> 
> 
> Do you have any insight on this?
> 
> Thanks,
> Qingxiang Ma



How to join stream and batch data in Flink?

2018-09-21 Thread 徐涛
Hi All,
Sometimes some “dimension table” need to be joined from the "fact 
table", if data are not joined before sent to Kafka.
So if the data are joined in Flink, does the “dimension table” have to 
be import as a stream, or there are some other ways can achieve it?
Thanks a lot!

Best
Henry

Re: S3 connector Hadoop class mismatch

2018-09-21 Thread Paul Lam
Hi Stefan, Stephan,

Yes, the `hadoop.security.group.mapping` option is explicitly set to 
`org.apache.hadoop.security.LdapGroupsMapping`. Guess that was why the 
classloader found an unshaded class. 

I don’t have the permission to change the Hadoop cluster configurations so I 
modified the `core-default-shaded.xml` and marked the option as final to solve 
the problem, after which the class loading exceptions were gone. 

But anther problem came up (likely not related to the previous problem):

In case of the old bucketing sink (version 1.5.3), it seems that the 
`org.apache.hadoop.fs.s3a.S3AFileSystem` is initiated twice before the task 
starts running. The first time is called by 
`org.apache.flink.fs.s3hadoop.S3FileSystemFactory` and works well, but the 
second time is called by bucketing sink itself, and fails to leverage the 
`s3.*` parameters like the access key and the secret key. 

The stack traces are as below:

```
com.amazonaws.AmazonClientException: Unable to load AWS credentials from any 
provider in the chain
at 
com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:117)
at 
com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3521)
at 
com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1031)
at 
com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:994)
at 
org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:297)
at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1307)
at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:426)
at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:370)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:254)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:730)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:295)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
at java.lang.Thread.run(Thread.java:748)
```

I haven’t figured out why the s3a filesystem needs to be initiated twice. And 
is it a bug that the bucketing sink does not use filesystem factories to create 
filesystem?

Thank you very much!

Best,
Paul Lam


> 在 2018年9月20日,23:35,Stephan Ewen  写道:
> 
> Hi!
> 
> A few questions to diagnose/fix this:
> 
>  Do you explicitly configure the "hadoop.security.group.mapping"?
> 
>   - If not, this setting may have leaked in from a Hadoop config in the 
> classpath. We are fixing this in Flink 1.7, to make this insensitive to such 
> settings leaking in.
> 
>   - If yes, then please try setting the config variable to 
> "hadoop.security.group.mapping: 
> org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.LdapGroupsMapping"?
> 
> Please let us know if that works!
> 
> 
> 
> On Thu, Sep 20, 2018 at 1:40 PM, Stefan Richter  
> wrote:
> Hi,
> 
> I could not find any open Jira for the problem you describe. Could you please 
> open one?
> 
> Best,
> Stefan
> 
> > Am 19.09.2018 um 09:54 schrieb Paul Lam :
> > 
> > Hi, 
> > 
> > I’m using StreamingFileSink of 1.6.0 to write logs to S3 and encounter a 
> > classloader problem. It seems that there are conflicts in 
> > flink-shaded-hadoop2-uber-1.6.0.jar and flink-s3-fs-hadoop-1.6.0.jar, and 
> > maybe related to class loading orders. 
> > 
> > Did anyone meet this problem? Thanks a lot!
> > 
> > The stack traces are as below:
> > 
> > java.io.IOException: java.lang.RuntimeException: class 
> > org.apache.hadoop.security.LdapGroupsMapping not 
> > org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.GroupMappingServiceProvider
> >   at 
> > org.apache.flink.fs.s3hadoop.shaded.org.apache.flink.runtime.fs.hdfs.AbstractFileSystemFactory.create(AbstractFileSystemFactory.java:62)
> >   at 
> > org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:395)
> >   at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)
> >   at 
> > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.(Buckets.java:111)
> >   at 
> > org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:242)
> >   at 
> > 

Re: Flink can't initialize operator state backend when starting from checkpoint

2018-09-21 Thread vino yang
Hi Qingxiang,

Several days ago, Stefan described the causes of this anomaly in a problem
similar to this:
Typically, these problems have been observed when something was wrong with
a serializer or a stateful serializer was used from multiple threads.

Thanks, vino.

Marvin777  于2018年9月21日周五 下午3:20写道:

> Hi all,
>
> When Flink(1.4.2) job starts, it could find checkpoint files at HDFS, but
> exception occurs during deserializing:
>
> [image: image.png]
>
> Do you have any insight on this?
>
> Thanks,
> Qingxiang Ma
>


Re: multiple flink applications on yarn are shown in one application.

2018-09-21 Thread Stefan Richter
Hi,

I see from your command that you are using the same jar file twice, so I want 
to double-check first how you even determine which job should be started? I am 
also adding Till (in CC) depending on your answer to my first question, he 
might have some additional thoughts.

Best,
Stefan

> Am 21.09.2018 um 04:15 schrieb weilongxing :
> 
> Thank you for your help.
> 
> It is per-job mode if I submit the job detached.
> 
> However, I submit these two jobs use the command below. 
> > /data/apps/opt/flink/bin/flink run -m yarn-cluster -d 
> > /data/apps/opt/fluxion/fluxion-flink.jar 
> > /data/apps/conf/fluxion//job_submit_rpt_company_performan.conf
> > /data/apps/opt/flink/bin/flink run -m yarn-cluster -d 
> > /data/apps/opt/fluxion/fluxion-flink.jar 
> > /data/apps/conf/fluxion//job_submit_rpt_company_user_s.conf
> 
> I browse the yarn application. As the picture shows I got 2 applications(0013 
> / 0012) but the job in both applications is the same. I can’t find the job 
> submitted secondly. The job in application_XXX_0013 should be 
> rpt_company_user_s. This will not happen in session mode. 
> 
> Best
> LX
> 
> 
> 
> 
> 
> 
>> 在 2018年9月20日,下午7:13,Stefan Richter > > 写道:
>> 
>> Hi,
>> 
>> currently, Flink still has to use session mode under the hood if you submit 
>> the job in attached-mode. The reason is that the job could consists of 
>> multiple parts that require to run one after the other. This will be changed 
>> in the future and also should not happen if you
>> submit the job detached.
>> 
>> Best,
>> Stefan
>> 
>>> Am 20.09.2018 um 10:29 schrieb weilongxing >> >:
>>> 
>>> I am new to Flink. I am using Flink on yarn per-job. I submitted two 
>>> applications.
>>> 
>>> > /data/apps/opt/flink/bin/flink run -m yarn-cluster  
>>> > /data/apps/opt/fluxion/fluxion-flink.jar 
>>> > /data/apps/conf/fluxion//job_submit_rpt_company_user_s.conf
>>> 
>>> > /data/apps/opt/flink/bin/flink run -m yarn-cluster  
>>> > /data/apps/opt/fluxion/fluxion-flink.jar 
>>> > /data/apps/conf/fluxion//job_submit_rpt_company_performan_.conf
>>> 
>>> I can saw these two applications on yarn. I noticed that the application 
>>> name is “flink session cluster” rather than “flink per-job”. Is that right?
>>> 
>>> 
>>> However, I can see both flink jobs in each yarn application. Is that right?
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> And Finally, I want to kill one job and killed one yarn application. 
>>> However one yarn application is killed and but both flink jobs restarted in 
>>> another yarn application. I want to kill one and remain another. In my 
>>> opinion, one job in an application and the job is killed when the yarn 
>>> application is killed.
>>> 
>>> 
>>> 
>>> 
>>> I think the problem is that these two application should be “flink per-job” 
>>> rather than “flink session cluster”. But I don’t know why it becomes “flink 
>>> session-cluster”.  Can anybody help? Thanks.
>>> 
>>> 
>>> 
>> 
> 



Flink can't initialize operator state backend when starting from checkpoint

2018-09-21 Thread Marvin777
Hi all,

When Flink(1.4.2) job starts, it could find checkpoint files at HDFS, but
exception occurs during deserializing:

[image: image.png]

Do you have any insight on this?

Thanks,
Qingxiang Ma