Re: Slow Flink program

2018-03-14 Thread Aljoscha Krettek
Hi,

If you didn't configure your program to use RocksDB then you're already not 
using RocksDB. I think the main issue, as others have pointed out, is that by 
keying on a constant key you're essentially turning your program into a 
parallelism-of-1 program, thereby wasting almost all cluster resources.

Best,
Aljoscha

> On 1. Mar 2018, at 09:25, Supun Kamburugamuve  wrote:
> 
> Is there a way to not go between RocksDB? For this test application it seems 
> not necessary as we don't expect fault tolerance and this is an streaming 
> case.
> 
> Thanks,
> Supun..
> 
> On Thu, Mar 1, 2018 at 11:55 AM, Stephan Ewen  > wrote:
> Few quick checks:
> 
>   - Do you properly set the parallelism?
>   - If you start 640 tasks (parallelism), and you use the same key for 
> everything, that behaves like parallelism 1 (Piotr mentioned this)
> 
>   - Do you use the RocksDB state backend? If yes, try the FsStateBackend. It 
> looks like your state data type object (CollectiveData) is very expensive to 
> serialize and for RocksDB, you get a back and forth serialization (off-heap 
> => on-heap, compute, on-heap => off-heap)
> 
> On Thu, Mar 1, 2018 at 4:32 PM, Supun Kamburugamuve  > wrote:
> Yes, the program runs fine, I can see it on the UI. Sorry, didn't include the 
> part where the execute is called. 
> 
> Thanks,
> Supun..
> 
> On Thu, Mar 1, 2018 at 10:27 AM, Fabian Hueske  > wrote:
> Are you sure the program is doing anything at all?
> Do you call execute() on the StreamExecutionEnvironment?
> 
> 2018-03-01 15:55 GMT+01:00 Supun Kamburugamuve  >:
> Thanks Piotrek, 
> 
> I did it this way on purpose to see how Flink performs. With 128000 messages 
> it takes an un-reasonable amount of time for Flink to complete the operation. 
> With another framework the same operation completes in about 70 seconds for 
> 1000 messages of size 128000, while Flink takes hours.
> 
> Thanks,
> Supun.. 
> 
> On Thu, Mar 1, 2018 at 3:58 AM, Piotr Nowojski  > wrote:
> Hi,
> 
> First of all learn about what’s going with your job: check the status of the 
> machines, cpu/network usage on the cluster. If CPU is not ~100%, analyse what 
> is preventing the machines to work faster (network bottleneck, locking, 
> blocking operations etc). If CPU is ~100%, profile the TaskManagers to see 
> what can you speed up.
> 
> In your example couple of questions:
> - you create CollectiveData instances with size 128000 by default. Doesn’t it 
> mean that your records are gigantic? I can not tell, since you didn’t provide 
> full code.
> - you are mapping the data to new Tuple2(0, s);  and 
> then keying by the first field, which is always 0. Probably all of the 
> records are ending up on one single machine 
> 
> Piotrek
> 
>> On 28 Feb 2018, at 17:20, Supun Kamburugamuve > > wrote:
>> 
>> Hi, 
>> 
>> I'm trying to run a simple benchmark on Flink streaming reduce. It seems it 
>> is very slow. Could you let me know if I'm doing something wrong.
>> 
>> Here is the program. I'm running this on 32 nodes with 20 tasks in each 
>> node. So the parallelism is at 640.
>> 
>> public class StreamingReduce {
>>   int size;
>>   int iterations;
>>   StreamExecutionEnvironment env;
>>   String outFile;
>> 
>>   public StreamingReduce(int size, int iterations, 
>> StreamExecutionEnvironment env, String outFile) {
>> this.size = size;
>> this.iterations = iterations;
>> this.env = env;
>> this.outFile = outFile;
>>   }
>> 
>>   public void execute() {
>> DataStream stringStream = env.addSource(new 
>> RichParallelSourceFunction() {
>>   int i = 1;
>>   int count = 0;
>>   int size = 0;
>>   int iterations = 1;
>> 
>>   @Override
>>   public void open(Configuration parameters) throws Exception {
>> super.open(parameters);
>> ParameterTool p = (ParameterTool)
>> 
>> getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
>> size = p.getInt("size", 128000);
>> iterations = p.getInt("itr", 1);
>> System.out.println(" iterations: " + iterations + " size: " + 
>> size);
>>   }
>> 
>>   @Override
>>   public void run(SourceContext sourceContext) throws 
>> Exception {
>> while (count < iterations) {
>>   CollectiveData i = new CollectiveData(size);
>>   sourceContext.collect(i);
>>   count++;
>> }
>>   }
>> 
>>   @Override
>>   public void cancel() {
>>   }
>> });
>> 
>> stringStream.map(new RichMapFunction> CollectiveData>>() {
>>   @Override
>>   public Tuple2 map(CollectiveData s) throws 
>> Exception {
>> return new Tuple2(0, s);
>>   }
>> }).keyBy(0).reduce(new ReduceFunction>() 
>> {
>>   @Override
>>   public Tuple2 reduce(Tuple2> CollectiveData> c1,
>> 

Re: Task Slots allocation

2018-03-14 Thread Aljoscha Krettek
Hi,

Could you maybe send a screenshot from the dashboard that shows how tasks are 
being assigned to slots?

Aljoscha

> On 4. Mar 2018, at 18:06, vijayakumar palaniappan  
> wrote:
> 
> I am using flink 1.4.0 in standalone cluster mode
> 
> I have a job with a graph like a Source(parallelism 3) -> Filter(parallelism 
> 3) -> Map(parallelism 1) -> sink(parallelism 3)
> 
> As per the understanding, max number of tasks slots should be bounded by the 
> max parallelism, which in this case is 3.
> 
> The behavior i observe is that, when more than required number of slots(say 
> 10) are available in cluster, this job uses 5 slots. I attempted setting the 
> same slotsharinggroup to all streamnodes, but didn't help.
> 
> Is this a change in behavior in 1.4.0?. If so are there any workarounds to 
> force the old behavior
> 
> 
> 
> -- 
> Thanks,
> -Vijay



Re: Which test cluster to use for checkpointing tests?

2018-03-14 Thread Aljoscha Krettek
Fyi, this is the Jira issue for tracking the issue: 
https://issues.apache.org/jira/browse/FLINK-2491 


Aljoscha

> On 6. Mar 2018, at 02:32, Nico Kruber  wrote:
> 
> There are still some upcoming changes for the network stack, but most of
> the heavy stuff it already through - you may track this under
> https://issues.apache.org/jira/browse/FLINK-8581
> 
> FLIP-6 is somewhat similar and currently only undergoes some stability
> improvements/bug fixing. The architectural changes are merged now.
> 
> 
> Nico
> 
> On 06/03/18 11:24, Paris Carbone wrote:
>> Hey,
>> 
>> Indeed checkpointing iterations and dealing with closed sources are 
>> orthogonal issues, that is why the latter is not part of FLIP-15. Though, 
>> you kinda need both to have meaningful checkpoints for jobs with iterations.
>> One has to do with correctness (checkpointing strongly connected components 
>> in the execution graph) and the other about termination (terminating the 
>> checkpointing protocol when certain tasks ‘finish’).
>> 
>> I am willing to help out resolving the first issue, though I prefer to wait 
>> for ongoing changes in the network model and FLIP-6 to be finalised to apply 
>> this change properly (are they?). 
>> 
>> Paris
>> 
>>> On 6 Mar 2018, at 10:51, Nico Kruber  wrote:
>>> 
>>> Hi Ken,
>>> sorry, I was mislead by the fact that you are using iterations and those
>>> were only documented for the DataSet API.
>>> 
>>> Running checkpoints with closed sources sounds like a more general thing
>>> than being part of the iterations rework of FLIP-15. I couldn't dig up
>>> anything on jira regarding this improvement either.
>>> 
>>> @Stephan: is this documented somewhere?
>>> 
>>> 
>>> Nico
>>> 
>>> On 02/03/18 23:55, Ken Krugler wrote:
 Hi Stephan,
 
 Thanks for the update.
 
 So is support for “running checkpoints with closed sources” part
 of FLIP-15
 ,
 or something separate?
 
 Regards,
 
 — Ken
 
> On Mar 1, 2018, at 9:07 AM, Stephan Ewen  > wrote:
> 
> @Ken The issue you are running into is that Checkpointing works
> currently only until the job reaches the point where the pipeline
> starts to drain out, meaning when the sources are done. In your case,
> the source is done immediately, sending out only one tuple.
> 
> Running checkpoints with closed sources is something that's on the
> feature list and will come soon…
 
 
 http://about.me/kkrugler
 +1 530-210-6378
 
>>> 
>> 
> 



Re: Restart hook and checkpoint

2018-03-14 Thread Aljoscha Krettek
Hi,

Have you looked into fine-grained recovery? 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-1+%3A+Fine+Grained+Recovery+from+Task+Failures
 


Stefan cc'ed might be able to give you some pointers about configuration.

Best,
Aljoscha

> On 6. Mar 2018, at 22:35, Ashish Pokharel  wrote:
> 
> Hi Gordon,
> 
> The issue really is we are trying to avoid checkpointing as datasets are 
> really heavy and all of the states are really transient in a few of our apps 
> (flushed within few seconds). So high volume/velocity and transient nature of 
> state make those app good candidates to just not have checkpoints. 
> 
> We do have offsets committed to Kafka AND we have “some” tolerance for gap / 
> duplicate. However, we do want to handle “graceful” restarts / shutdown. For 
> shutdown, we have been taking savepoints (which works great) but for restart, 
> we just can’t find a way. 
> 
> Bottom line - we are trading off resiliency for resource utilization and 
> performance but would like to harden apps for production deployments as much 
> as we can.
> 
> Hope that makes sense.
> 
> Thanks, Ashish
> 
>> On Mar 6, 2018, at 10:19 PM, Tzu-Li Tai  wrote:
>> 
>> Hi Ashish,
>> 
>> Could you elaborate a bit more on why you think the restart of all operators
>> lead to data loss?
>> 
>> When restart occurs, Flink will restart the job from the latest complete
>> checkpoint.
>> All operator states will be reloaded with state written in that checkpoint,
>> and the position of the input stream will also be re-winded.
>> 
>> I don't think there is a way to force a checkpoint before restarting occurs,
>> but as I mentioned, that should not be required, because the last complete
>> checkpoint will be used.
>> Am I missing something in your particular setup?
>> 
>> Cheers,
>> Gordon
>> 
>> 
>> 
>> --
>> Sent from: 
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
> 



Re: Record Delivery Guarantee with Kafka 1.0.0

2018-03-14 Thread Aljoscha Krettek
Hi,

How are you checking that records are missing? Flink should flush to Kafka and 
wait for all records to be flushed when performing a checkpoint.

Best,
Aljoscha

> On 13. Mar 2018, at 21:31, Chirag Dewan  wrote:
> 
> Hi,
> 
> Still stuck around this. 
> 
> My understanding is, this is something Flink can't handle. If the batch-size 
> of Kafka Producer is non zero(which ideally should be), there will be 
> in-memory records and data loss(boundary cases). Only way I can handle this 
> with Flink is my checkpointing interval, which flushes any buffered records. 
> 
> Is my understanding correct here? Or am I still missing something?  
> 
> thanks,
> 
> Chirag  
> 
> On Monday, 12 March, 2018, 12:59:51 PM IST, Chirag Dewan 
>  wrote:
> 
> 
> Hi,
> 
> I am trying to use Kafka Sink 0.11 with ATLEAST_ONCE semantic and 
> experiencing some data loss on Task Manager failure.
> 
> Its a simple job with parallelism=1 and a single Task Manager. After a few 
> checkpoints(kafka flush's) i kill one of my Task Manager running as a 
> container on Docker Swarm. 
> 
> I observe a small number of records, usually 4-5, being lost on Kafka 
> broker(1 broker cluster, 1 topic with 1 partition).
> 
> My FlinkKafkaProducer config are as follows : 
> 
> batch.size=default(16384)
> retries=3
> max.in.flight.requests.per.connection=1
> acks=1
> 
> As I understand it, all the messages batched by 
> KafkaProducer(RecordAccumulator) in the memory-buffer, are lost. Is this why 
> I cant see my records on the broker? Or is there something I am doing 
> terribly wrong? Any help appreciated.
> 
> TIA,
> 
> Chirag
> 
>  
>  



Re: state backend configuration

2018-03-14 Thread Aljoscha Krettek
Hi,

This is not possible on versions of Flink prior to 1.5 but starting with that 
(as of yet unreleased) versions you'll be able to configure this via 
state.backend.incremental: true.

Configurations that you do in code will take precedence.

Best,
Aljoscha

> On 14. Mar 2018, at 02:33, Jayant Ameta  wrote:
> 
> Hi,
> Can I enable incremental checkpoint for rocksdb via flink-conf.yaml file?
> 
> Also, I can set state.backend via the conf file as well as in my code using 
> env.setStateBackend(). Which of these take precedence?
> 
> Jayant Ameta



design flink cep pattern for transaction life cycle

2018-03-14 Thread shishal singh
0down votefavorite


I have a scenario where I need to detect following pattern for a
transaction.

My basic transaction event look like this => *Transaction(id,status, time)*

Transaction goes through following state.

BEGIN -> COMPLETE (within 5 day, if this comes then no need to wait
for PROCESSING)
BEGIN -> PROCESSING (within 1 day)
PROCESSING -> PROCESSING (within 1 day, could happen upto 5 days)
PROCESSING -> COMPLETE (within 1 day)

In plain text, basically when a transaction is initiated it can go from
BEGIN to COMPLETE in 5 days. But If its can not be completed within 1 day
then it keep sending a PROCESSING event on that day. So this can happen
upto 5 day and then finally it can send COMPLETE event.

I want to alert if there is any diversion form this pattern.

The number of transaction in a day could be 10-50 million. I am evaluating
FlinkCEP for this problem. I have gone through their doc
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/libs/cep.html

 Is it possible to design this pattern using Flink CEP?

Also if I am not wrong , I need to use PatternTimeoutFunction to trigger
alert.

Regards,
Shishal


Re: flink sql: "slow" performance on Over Window aggregation with time attribute set to event time

2018-03-14 Thread Fabian Hueske
Hi,

It is typically not a good idea to generate watermarks based on system
(machine) time. Watermarks should be data driven.
As soon as the clock of one of your machines is 1 second behind the other
machines, its watermarks will also be 1 second behind and hence the
complete stream.

Best, Fabian

2018-03-14 18:40 GMT+01:00 Yan Zhou [FDS Science] :

> Hi Fabian,
>
>
> Thank you for answering the question. However, I think it doesn't explain
> my situation. the source tasks' watermark are set to 10 ms behind the
> system time. Assigners allowing a fixed amount of lateness
> 
> is used. So even the slowest source task is not that slow.
>
>
> Best
>
> Yan
>
>
>
>
>
> --
> *From:* Fabian Hueske 
> *Sent:* Wednesday, March 14, 2018 3:28 AM
> *To:* Yan Zhou [FDS Science]
> *Cc:* user@flink.apache.org
> *Subject:* Re: flink sql: "slow" performance on Over Window aggregation
> with time attribute set to event time
>
> Hi,
>
> Flink advances watermarks based on all parallel source tasks. If one of
> the source tasks lags behind the others, the event time progresses as
> determined by the "slowest" source task.
> Hence, records ingested from a faster task might have a higher processing
> latency.
>
> Best, Fabian
>
> 2018-03-14 1:36 GMT+01:00 Yan Zhou [FDS Science] :
>
> Hi,
>
> I am using flink sql in my application. It simply reads records from kafka
> source, converts to table, then runs an query to have over window
> aggregation for each record. Time lag watermark assigner with 10ms time lag
> is used.
>
> The performance is not ideal. the end-to-end latency, which is the
> difference between the time an record arrives in flink source and the time
> the record arrives in flink sink, is around 250ms (median). Please note
> that my query, which is over window aggregation, will generate one result
> for each input record. I was expecting it to be less then 100ms. I increase
> the number of query to 100 times and still have same median end-to-end
> latency with plenty of CPU and memory available. It seems to me that
> something is holding my application back.
>
> However, When I use process time as time attribute without changing
> anything else, the latency is reduced to 50ms. I understand that in general
> using process time should be faster. But for my test using event time, the
> time lag is set to only 10ms, which should mean the operators will almost
> immediately process the events after they arrives. And the classes which
> calculate over window aggregation(ProcTimeBoundedRangeOve,
> RowTimeBoundedRowsOver and etc...) basically have same logic. Why does
> using process_time or event_time could bring such big difference in
> end-to-end latency? And what is hold my application back if time attribute
> is set event time?
>
> Below is my cluster and application setup and thank you for your time.
>
>
> *The cluster:*
>
> *The cluster runs in standalone mode with 7 servers. Each server has 24
> cores, 240 GB memory. There are 1 job manager and 6 task managers. Each
> task manager is allocated with 12 cores, 120 GB memory and 6 taskmanager
> slots. Running hdfs over ssd on these servers as well. *
>
>
> *The application:*
>
> *When the event arrives flink from kafka, an ingestionTs is set for the
> event by the application. When the event arrives sink, the process latency
> is calculated as System.currentTimeMillis() - ingestionTs. The value is
> consider the end-to-end latency and recorded with histogram metric and can
> be view in flink web portal. RocksDB state backend is used. Time lag water
> assigner with time lag of 10ms is used. *
>
>
> *Custom Source *
> *-> Flat Map *
> *-> Timestamps/Watermarks *
> *-> (from: (id, ip, type, ingestionTs, eventTs) -> select: (id, ip, type,
> ingestionTs, eventTs))*
> *--HASH-->*
> *over:( PARTITION BY: ip,*
>
> *ORDER BY: eventTs, *
>
> *RANGEBETWEEN 8640 PRECEDING AND CURRENT ROW, *
>
> *select: (id, ip, eventTs, COUNT(*) AS w0$o0), ingestionTs) *
>
> *-> select: (id, eventTs, w0$o0 AS CNT), ingestionTs) *
> *-> to: Tuple2 *
> *-> Sink: Unnamed*
>
> *select id, eventTs, count(*) over (partition by id order by eventTs
> ranges between interval '24' hour preceding and current row) as cnt1 from
> myTable.*
>
>
>
>
>


Re: Too many open files on Bucketing sink

2018-03-14 Thread Felix Cheung
I have seen this before as well.

My workaround was to limit the number of parallelism but it is the unfortunate 
effect of limiting the number of processing tasks also (and so slowing things 
down)

Another alternative is to have bigger buckets (and smaller number of buckets)

Not sure if there is a good solution.


From: galantaa 
Sent: Tuesday, March 13, 2018 7:08:01 AM
To: user@flink.apache.org
Subject: Too many open files on Bucketing sink

Hey all,
I'm using bucketing sink with a bucketer that creates partition per customer
per day.
I sink the files to s3.
it suppose to work on around 500 files at the same time (according to my
partitioning).

I have a critical problem of 'Too many open files'.
I've upload two taskmanagers, each with 16 slots. I've checked how many open
files (or file descriptors) exist with 'lsof | wc -l' and it had reached
over a million files on each taskmanager!

after that, I'd decreased the num of taskSlots to 8 (4 in each taskmanager),
and the concurrency dropped.
checking 'lsof | wc -l' gave around 250k file on each machine.
I also checked how many actual files exist in my tmp dir (it works on the
files there before uploading them to s3) - around 3000.

I think that each taskSlot works with several threads (maybe 16?), and each
thread holds a fd for the actual file, and thats how the numbers get so
high.

Is that a know problem? is there anything I can do?
by now, I filter just 10 customers and it works great, but I have to find a
real solution so I can stream all the data.
Maybe I can also work with a single task Slot per machine but I'm not sure
this is a good idea.

Thank you very much,
Alon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Question regarding effect of 'restart-strategy: none' on Flink (1.4.1) JobManager HA

2018-03-14 Thread Jash, Shaswata (Nokia - IN/Bangalore)
Hi Timo,

My observation was based on standalone cluster.

Regards,
Shaswata

Get Outlook for Android



From: Timo Walther
Sent: Wednesday, March 14, 22:11
Subject: Re: Question regarding effect of 'restart-strategy: none' on Flink 
(1.4.1) JobManager HA
To: user@flink.apache.org


Hi Shaswata,

are you using a standalone Flink cluster or how does your deployement look 
like? E.g. YARN has its own restart attempts [1].

Regards,
Timo

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/jobmanager_high_availability.html#yarn-cluster-high-availability


Am 14.03.18 um 13:20 schrieb Jash, Shaswata (Nokia - IN/Bangalore):
Hello Flink-Experts,

I am observing that even after setting ‘restart-strategy: none’ with Flink 
JobManager HA (High Availability) setup, JobManager is still continuing 
restarting the ongoing jobs after failover. Is it the expected behavior? Jobs 
have not used any checkpoint/savepoints. My understanding was – JobManager HA 
along with other than no restart strategy will only ensure job be will 
resubmitted by the new JobManager after failover. I am using FLink 1.4.1.

Can you please share your views?

Regards,
Shaswata





Re: flink sql: "slow" performance on Over Window aggregation with time attribute set to event time

2018-03-14 Thread Yan Zhou [FDS Science]
Hi Fabian,


Thank you for answering the question. However, I think it doesn't explain my 
situation. the source tasks' watermark are set to 10 ms behind the system time. 
Assigners allowing a fixed amount of 
lateness
 is used. So even the slowest source task is not that slow.


Best

Yan





From: Fabian Hueske 
Sent: Wednesday, March 14, 2018 3:28 AM
To: Yan Zhou [FDS Science]
Cc: user@flink.apache.org
Subject: Re: flink sql: "slow" performance on Over Window aggregation with time 
attribute set to event time

Hi,

Flink advances watermarks based on all parallel source tasks. If one of the 
source tasks lags behind the others, the event time progresses as determined by 
the "slowest" source task.
Hence, records ingested from a faster task might have a higher processing 
latency.

Best, Fabian

2018-03-14 1:36 GMT+01:00 Yan Zhou [FDS Science] 
mailto:yz...@coupang.com>>:

Hi,

I am using flink sql in my application. It simply reads records from kafka 
source, converts to table, then runs an query to have over window aggregation 
for each record. Time lag watermark assigner with 10ms time lag is used.

The performance is not ideal. the end-to-end latency, which is the difference 
between the time an record arrives in flink source and the time the record 
arrives in flink sink, is around 250ms (median). Please note that my query, 
which is over window aggregation, will generate one result for each input 
record. I was expecting it to be less then 100ms. I increase the number of 
query to 100 times and still have same median end-to-end latency with plenty of 
CPU and memory available. It seems to me that something is holding my 
application back.

However, When I use process time as time attribute without changing anything 
else, the latency is reduced to 50ms. I understand that in general using 
process time should be faster. But for my test using event time, the time lag 
is set to only 10ms, which should mean the operators will almost immediately 
process the events after they arrives. And the classes which calculate over 
window aggregation(ProcTimeBoundedRangeOve, RowTimeBoundedRowsOver and etc...) 
basically have same logic. Why does using process_time or event_time could 
bring such big difference in end-to-end latency? And what is hold my 
application back if time attribute is set event time?

Below is my cluster and application setup and thank you for your time.


The cluster:

The cluster runs in standalone mode with 7 servers. Each server has 24 cores, 
240 GB memory. There are 1 job manager and 6 task managers. Each task manager 
is allocated with 12 cores, 120 GB memory and 6 taskmanager slots. Running hdfs 
over ssd on these servers as well.


The application:

When the event arrives flink from kafka, an ingestionTs is set for the event by 
the application. When the event arrives sink, the process latency is calculated 
as System.currentTimeMillis() - ingestionTs. The value is consider the 
end-to-end latency and recorded with histogram metric and can be view in flink 
web portal. RocksDB state backend is used. Time lag water assigner with time 
lag of 10ms is used.


Custom Source
-> Flat Map
-> Timestamps/Watermarks
-> (from: (id, ip, type, ingestionTs, eventTs) -> select: (id, ip, type, 
ingestionTs, eventTs))
--HASH-->
over:( PARTITION BY: ip,
ORDER BY: eventTs,
RANGEBETWEEN 8640 PRECEDING AND CURRENT ROW,
select: (id, ip, eventTs, COUNT(*) AS w0$o0), ingestionTs)
-> select: (id, eventTs, w0$o0 AS CNT), ingestionTs)
-> to: Tuple2
-> Sink: Unnamed

select id, eventTs, count(*) over (partition by id order by eventTs ranges 
between interval '24' hour preceding and current row) as cnt1 from myTable.





Re: Question regarding effect of 'restart-strategy: none' on Flink (1.4.1) JobManager HA

2018-03-14 Thread Timo Walther

Hi Shaswata,

are you using a standalone Flink cluster or how does your deployement 
look like? E.g. YARN has its own restart attempts [1].


Regards,
Timo

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/jobmanager_high_availability.html#yarn-cluster-high-availability



Am 14.03.18 um 13:20 schrieb Jash, Shaswata (Nokia - IN/Bangalore):


Hello Flink-Experts,

I am observing that even after setting ‘restart-strategy: none’ with 
Flink JobManager HA (High Availability) setup, JobManager is still 
continuing restarting the ongoing jobs after failover. Is it the 
expected behavior? Jobs have not used any checkpoint/savepoints. My 
understanding was – JobManager HA along with other than no restart 
strategy will only ensure job be will resubmitted by the new 
JobManager after failover. I am using FLink 1.4.1.


Can you please share your views?

Regards,

Shaswata





Re: Flink SSL Setup on a standalone cluster

2018-03-14 Thread Timo Walther

Hi Vinay,

do you have any exception or log entry that describes the failure?

Regards,
Timo


Am 14.03.18 um 15:51 schrieb Vinay Patil:

Hi,

I have keystore for each of the 4 nodes in cluster and respective 
trustore. The cluster is configured correctly with SSL , verified this 
by accessing job manager using https and also see the TM path as 
akka.ssl.tcp, however the job is not getting submitted to the cluster.


I am not allowed to import the certificate to the java default 
trustore, so I have provided the trustore and keystore as jvm args to 
the job.


Is there any other configuration I should do so that the job is submitted

Regards,
Vinay Patil





Re: activemq connector not working..

2018-03-14 Thread Timo Walther

Hi Puneet,

are you running this job on the cluster or locally in your IDE?

Regards,
Timo


Am 14.03.18 um 13:49 schrieb Puneet Kinra:

Hi

I used apache bahir connector  below is the code.the job is getting 
finished
and not generated the output as well ,ideal it should keep on running 
below the code.



import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.flink.streaming.api.datastream.DataStream;
import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.connectors.activemq.AMQSource;
import org.apache.flink.streaming.connectors.activemq.AMQSourceConfig;
import org.apache.flink.streaming.connectors.activemq.DestinationType;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

/**
 * @author puneet
 *
 */
public class TestAMQ {


public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
AMQSourceConfig sourceConfig = new 
AMQSourceConfig.AMQSourceConfigBuilder()
.setConnectionFactory(new 
ActiveMQConnectionFactory("tcp://localhost:61616"))

.setDestinationName("test")
.setDeserializationSchema(new SimpleStringSchema())
.setDestinationType(DestinationType.QUEUE)
.build();
DataStream < String > messageStream = env.addSource(new 
AMQSource(sourceConfig));

messageStream.print();
env.execute();
}

}


--
*Cheers *
*
*
*Puneet Kinra*
*
*

*Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com 
*


*e-mail :puneet.ki...@customercentria.com 
*







Flink SSL Setup on a standalone cluster

2018-03-14 Thread Vinay Patil
Hi,

I have keystore for each of the 4 nodes in cluster and respective trustore.
The cluster is configured correctly with SSL , verified this by accessing
job manager using https and also see the TM path as akka.ssl.tcp, however
the job is not getting submitted to the cluster.

I am not allowed to import the certificate to the java default trustore, so
I have provided the trustore and keystore as jvm args to the job.

Is there any other configuration I should do so that the job is submitted

Regards,
Vinay Patil


Re: PartitionNotFoundException when restarting from checkpoint

2018-03-14 Thread Seth Wiesman
Hit send too soon.

Having spent some more time with this, it appears that zookeeper being in a bad 
state was unable to track a downed kafka broker. This investigation has been 
very much trial and error up to this point please let me know if I seem way off 
base ☺

[cid:image001.png@01D3BB7E.BAFAAC20]

Seth Wiesman| Software Engineer
4 World Trade Center, 46th Floor, New York, NY 
10007
swies...@mediamath.com



From: Seth Wiesman 
Date: Wednesday, March 14, 2018 at 10:14 AM
To: Fabian Hueske , Stefan Richter 

Cc: "user@flink.apache.org" 
Subject: Re: PartitionNotFoundException when restarting from checkpoint

Unfortunately the stack trace was swallowed by the java timer in the 
LocalInputChannel[1], the real error is forwarded out to the main thread but I 
couldn’t figure out how to see that in my logs.

However, I believe I am close to having a reproducible example. Run a 1.4 
DataStream, sinking to kafka 0.11 and cancel with a savepoint. If you then shut 
down the kafka daemon on a single broker but keep the rest proxy up you should 
see this error when you resume.

[1] 
https://github.com/apache/flink/blob/fa024726bb801fc71cec5cc303cac1d4a03f555e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java#L151

[cid:image002.png@01D3BB7E.BAFAAC20]

Seth Wiesman| Software Engineer
4 World Trade Center, 46th Floor, New York, NY 
10007
swies...@mediamath.com



From: Fabian Hueske 
Date: Tuesday, March 13, 2018 at 8:02 PM
To: Seth Wiesman , Stefan Richter 

Cc: "user@flink.apache.org" 
Subject: Re: PartitionNotFoundException when restarting from checkpoint

Hi Seth,
Thanks for sharing how you resolved the problem!
The problem might have been related to Flink's key groups which are used to 
assign key ranges to tasks.
Not sure why this would be related to ZooKeeper being in a bad state. Maybe 
Stefan (in CC) has an idea about the cause.
Also, it would be helpful if you could share the stacktrace of the exception 
(in case you still have it).
Best, Fabian

2018-03-13 14:35 GMT+01:00 Seth Wiesman 
mailto:swies...@mediamath.com>>:
It turns out the issue was due to our zookeeper installation being in a bad 
state. I am not clear enough on flink’s networking internals to explain how 
this manifested as a partition not found exception, but hopefully this can 
serve as a starting point for other’s who run into the same issue.

[cid:image003.png@01D3BB7E.BAFAAC20]

Seth Wiesman| Software Engineer
4 World Trade Center, 46th Floor, New York, NY 
10007
swies...@mediamath.com



From: Seth Wiesman mailto:swies...@mediamath.com>>
Date: Friday, March 9, 2018 at 11:53 AM
To: "user@flink.apache.org" 
mailto:user@flink.apache.org>>
Subject: PartitionNotFoundException when restarting from checkpoint

Hi,

We are running Flink 1.4.0 with a yarn deployment on ec2 instances, rocks dB 
and incremental checkpointing, last night a job failed and became stuck in a 
restart cycle with a PartitionNotFound. We tried restarting the checkpoint on a 
fresh Flink session with no luck. Looking through the logs we can see that the 
specified partition is never registered with the ResultPartitionManager.

My questions are:

1)  Are partitions a part of state or are the ephemeral to the job

2)  If they are not part of state, where would the task managers be getting 
that partition id to begin with

3)  Right now we are logging everything under 
org.apache.flink.runtime.io.network, is 
there anywhere else to look

Thank you,

[cid:image004.png@01D3BB7E.BAFAAC20]

Seth Wiesman| Software Engineer
4 World Trade Center, 46th Floor, New York, NY 
10007
swies...@mediamath.com





Re: PartitionNotFoundException when restarting from checkpoint

2018-03-14 Thread Seth Wiesman
Unfortunately the stack trace was swallowed by the java timer in the 
LocalInputChannel[1], the real error is forwarded out to the main thread but I 
couldn’t figure out how to see that in my logs.

However, I believe I am close to having a reproducible example. Run a 1.4 
DataStream, sinking to kafka 0.11 and cancel with a savepoint. If you then shut 
down the kafka daemon on a single broker but keep the rest proxy up you should 
see this error when you resume.

[1] 
https://github.com/apache/flink/blob/fa024726bb801fc71cec5cc303cac1d4a03f555e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java#L151

[cid:image001.png@01D3BB7D.472CF0B0]

Seth Wiesman| Software Engineer
4 World Trade Center, 46th Floor, New York, NY 
10007
swies...@mediamath.com



From: Fabian Hueske 
Date: Tuesday, March 13, 2018 at 8:02 PM
To: Seth Wiesman , Stefan Richter 

Cc: "user@flink.apache.org" 
Subject: Re: PartitionNotFoundException when restarting from checkpoint

Hi Seth,
Thanks for sharing how you resolved the problem!
The problem might have been related to Flink's key groups which are used to 
assign key ranges to tasks.
Not sure why this would be related to ZooKeeper being in a bad state. Maybe 
Stefan (in CC) has an idea about the cause.
Also, it would be helpful if you could share the stacktrace of the exception 
(in case you still have it).
Best, Fabian

2018-03-13 14:35 GMT+01:00 Seth Wiesman 
mailto:swies...@mediamath.com>>:
It turns out the issue was due to our zookeeper installation being in a bad 
state. I am not clear enough on flink’s networking internals to explain how 
this manifested as a partition not found exception, but hopefully this can 
serve as a starting point for other’s who run into the same issue.

[cid:image002.png@01D3BB7D.472CF0B0]

Seth Wiesman| Software Engineer
4 World Trade Center, 46th Floor, New York, NY 
10007
swies...@mediamath.com



From: Seth Wiesman mailto:swies...@mediamath.com>>
Date: Friday, March 9, 2018 at 11:53 AM
To: "user@flink.apache.org" 
mailto:user@flink.apache.org>>
Subject: PartitionNotFoundException when restarting from checkpoint

Hi,

We are running Flink 1.4.0 with a yarn deployment on ec2 instances, rocks dB 
and incremental checkpointing, last night a job failed and became stuck in a 
restart cycle with a PartitionNotFound. We tried restarting the checkpoint on a 
fresh Flink session with no luck. Looking through the logs we can see that the 
specified partition is never registered with the ResultPartitionManager.

My questions are:

1)  Are partitions a part of state or are the ephemeral to the job

2)  If they are not part of state, where would the task managers be getting 
that partition id to begin with

3)  Right now we are logging everything under 
org.apache.flink.runtime.io.network, is 
there anywhere else to look

Thank you,

[cid:image003.png@01D3BB7D.472CF0B0]

Seth Wiesman| Software Engineer
4 World Trade Center, 46th Floor, New York, NY 
10007
swies...@mediamath.com





[Proposal] CEP library changes - review request

2018-03-14 Thread Shailesh Jain
Hi,

We've been facing issues* w.r.t watermarks not supported per key, which led
us to:

Either (a) run the job in Processing time for a KeyedStream -> compromising
on use cases which revolve around catching time-based patterns
or (b) run the job in Event time for multiple data streams (one data stream
per key) -> this is not scalable as the number of operators grow linearly
with the number of keys

To address this, we've done a quick (poc) change in the
AbstractKeyedCEPPatternOperator to allow for the NFAs to progress based on
timestamps extracted from the events arriving into the operator (and not
from the watermarks). We've tested it against our usecase and are seeing a
significant improvement in memory usage without compromising on the
watermark functionality.

It'll be really helpful if someone from the cep dev group can take a look
at this branch - https://github.com/jainshailesh/flink/commits/cep_changes
and provide comments on the approach taken, and maybe guide us on the next
steps for taking it forward.

Thanks,
Shailesh

* Links to previous email threads related to the same issue:
http://apache-flink-user-mailing-list-archive.2336050.
n4.nabble.com/Question-on-event-time-functionality-
using-Flink-in-a-IoT-usecase-td18653.html
http://apache-flink-user-mailing-list-archive.2336050.
n4.nabble.com/Generate-watermarks-per-key-in-a-KeyedStream-td16629.html
http://apache-flink-user-mailing-list-archive.2336050.
n4.nabble.com/Correlation-between-number-of-operators-
and-Job-manager-memory-requirements-td18384.html


activemq connector not working..

2018-03-14 Thread Puneet Kinra
Hi

I used apache bahir connector  below is the code.the job is getting finished
and not generated the output as well ,ideal it should keep on running below
the code.


import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.flink.streaming.api.datastream.DataStream;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.activemq.AMQSource;
import org.apache.flink.streaming.connectors.activemq.AMQSourceConfig;
import org.apache.flink.streaming.connectors.activemq.DestinationType;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

/**
 * @author puneet
 *
 */
public class TestAMQ {


public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
AMQSourceConfig sourceConfig = new
AMQSourceConfig.AMQSourceConfigBuilder()
.setConnectionFactory(new
ActiveMQConnectionFactory("tcp://localhost:61616"))
.setDestinationName("test")
.setDeserializationSchema(new SimpleStringSchema())
.setDestinationType(DestinationType.QUEUE)
.build();
DataStream < String > messageStream = env.addSource(new
AMQSource(sourceConfig));
messageStream.print();
env.execute();
}

}


-- 
*Cheers *

*Puneet Kinra*

*Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
*

*e-mail :puneet.ki...@customercentria.com
*


Re: Global Window, Trigger and Watermarks, Parallelism

2018-03-14 Thread dim5b
By adding , AfterMatchSkipStrategy.skipPastLastEvent() it returns what i
want.

Is there a way to track/emit "ongoing" events i.e before the pattern matchs
the end event type?





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Question regarding effect of 'restart-strategy: none' on Flink (1.4.1) JobManager HA

2018-03-14 Thread Jash, Shaswata (Nokia - IN/Bangalore)
Hello Flink-Experts,

I am observing that even after setting 'restart-strategy: none' with Flink 
JobManager HA (High Availability) setup, JobManager is still continuing 
restarting the ongoing jobs after failover. Is it the expected behavior? Jobs 
have not used any checkpoint/savepoints. My understanding was - JobManager HA 
along with other than no restart strategy will only ensure job be will 
resubmitted by the new JobManager after failover. I am using FLink 1.4.1.

Can you please share your views?

Regards,
Shaswata


Flink CEP pattern with multiple temporal constraint

2018-03-14 Thread shishal singh
Hi Team,

I have a scenario where I need to detect following pattern for a
transaction.

My basic transaction event look like this => *Transaction(id,status,
eventTime)*

Transaction goes through following state.

*BEGIN -> COMPLETE (within 5 day, if this comes then no need to wait for
PROCESSING)*
*BEGIN -> PROCESSING (within 1 day)*
*PROCESSING -> PROCESSING (within 1 day, could happen upto 5 days)*
*PROCESSING -> COMPLETE (within 1 day)*


In plain text, basically when a transaction is initiated it can go from
BEGIN to COMPLETE in 5 days. But If its can not be completed within 1 day
then it keep sending a PROCESSING event on that day. So this can happen
upto 5 day and then finally it can send COMPLETE event.

I want to alert per transaction if there is any diversion form this pattern.

The number of transaction in a day could be 10-50 million. I am looking for
FlinkCEP for this problem. I have gone through their doc
https://ci.apache.org/projects/flink/flink-docs-
release-1.4/dev/libs/cep.html

But still I am not sure how to design this pattern. I hope it is possible
to design pattern with Flink CEP?

Also if I am not wrong , I need to use PatternTimeoutFunction to trigger
alert.

Thanks,
Shishal


Re: flink sql: "slow" performance on Over Window aggregation with time attribute set to event time

2018-03-14 Thread Fabian Hueske
Hi,

Flink advances watermarks based on all parallel source tasks. If one of the
source tasks lags behind the others, the event time progresses as
determined by the "slowest" source task.
Hence, records ingested from a faster task might have a higher processing
latency.

Best, Fabian

2018-03-14 1:36 GMT+01:00 Yan Zhou [FDS Science] :

> Hi,
>
> I am using flink sql in my application. It simply reads records from kafka
> source, converts to table, then runs an query to have over window
> aggregation for each record. Time lag watermark assigner with 10ms time lag
> is used.
>
> The performance is not ideal. the end-to-end latency, which is the
> difference between the time an record arrives in flink source and the time
> the record arrives in flink sink, is around 250ms (median). Please note
> that my query, which is over window aggregation, will generate one result
> for each input record. I was expecting it to be less then 100ms. I increase
> the number of query to 100 times and still have same median end-to-end
> latency with plenty of CPU and memory available. It seems to me that
> something is holding my application back.
>
> However, When I use process time as time attribute without changing
> anything else, the latency is reduced to 50ms. I understand that in general
> using process time should be faster. But for my test using event time, the
> time lag is set to only 10ms, which should mean the operators will almost
> immediately process the events after they arrives. And the classes which
> calculate over window aggregation(ProcTimeBoundedRangeOve,
> RowTimeBoundedRowsOver and etc...) basically have same logic. Why does
> using process_time or event_time could bring such big difference in
> end-to-end latency? And what is hold my application back if time attribute
> is set event time?
>
> Below is my cluster and application setup and thank you for your time.
>
>
> *The cluster:*
>
> *The cluster runs in standalone mode with 7 servers. Each server has 24
> cores, 240 GB memory. There are 1 job manager and 6 task managers. Each
> task manager is allocated with 12 cores, 120 GB memory and 6 taskmanager
> slots. Running hdfs over ssd on these servers as well. *
>
>
> *The application:*
>
> *When the event arrives flink from kafka, an ingestionTs is set for the
> event by the application. When the event arrives sink, the process latency
> is calculated as System.currentTimeMillis() - ingestionTs. The value is
> consider the end-to-end latency and recorded with histogram metric and can
> be view in flink web portal. RocksDB state backend is used. Time lag water
> assigner with time lag of 10ms is used. *
>
>
> *Custom Source *
> *-> Flat Map *
> *-> Timestamps/Watermarks *
> *-> (from: (id, ip, type, ingestionTs, eventTs) -> select: (id, ip, type,
> ingestionTs, eventTs))*
> *--HASH-->*
> *over:( PARTITION BY: ip,*
>
> *ORDER BY: eventTs, *
>
> *RANGEBETWEEN 8640 PRECEDING AND CURRENT ROW, *
>
> *select: (id, ip, eventTs, COUNT(*) AS w0$o0), ingestionTs) *
>
> *-> select: (id, eventTs, w0$o0 AS CNT), ingestionTs) *
> *-> to: Tuple2 *
> *-> Sink: Unnamed*
>
> *select id, eventTs, count(*) over (partition by id order by eventTs
> ranges between interval '24' hour preceding and current row) as cnt1 from
> myTable.*
>
>
>
>


Re: sorting data into sink

2018-03-14 Thread Fabian Hueske
Hi,

To be honest, I did not understand your requirements and what you are
looking for.

stream.keyBy("partition").addSink(...) will partition the output on the
"partition" attribute before handing it to the sink.
Hence, all records with the same "partition" value will be handled by the
same parallel sink instance.

Best, Fabian

2018-03-13 20:20 GMT+01:00 Telco Phone :

> Does any know if this is a correct assumption
>
>
> DataStream sorted = stream.keyBy("partition");
>
> Will automattically put same record to the same sink thread ?
>
>
> The behavior I am seeing is that a Sink setup with multiple threads is see
> data from the same hour.
>
> Any good examples of how to sort data so that Sink threads only get the
> same type of data ?
>
> Thanks
>
>
>


Re: State serialization problem when we add a new field in the object

2018-03-14 Thread Kostas Kloudas
Hi Konstantin,

What you could do, is that you write and intermediate job that has the old 
ValueState “oldState” 
and the new one “newState”, with the new format. 

When an element comes in this intermediate job, you check the oldState if it is 
empty for that key or not. 
If it is null (empty), you simply process the element as it is the first time 
you see the key. 
If it is not empty, then you implement your migration logic that ports the 
oldState to the newState format,
you store the migrated state in the newState, and delete it from the oldState. 
Of course after the migration you process the element as usual, but only use 
the new state.

If at some point you are sure that you have seen all the keys from the previous 
version of the code, 
then at that point you can be sure that all the old-format states have been 
migrated and you can take 
a savepoint, clean up the job from the migration logic, and resume from the 
savepoint with 
the new code. 

If there is no such point where you can be sure that you have migrated the 
state for all keys, then you 
just your job run like this, i.e. with the migration logic. 

The problem with the above strategy is that in the case that you do not have a 
point where you can be sure 
that you have seen all keys, if you want to migrate once again in the future, 
you will have to implement
the same thing but migrating from two different previous versions. But at that 
point you may have a policy 
that says that if I have not seen a key for the last week or month, then I do 
not consider active and i do not 
care about it. 

I hope this helps!

Cheers,
Kostas

> On Mar 14, 2018, at 10:03 AM, kla  wrote:
> 
> Hi Aljoscha,
> 
> Thanks for your reply.
> 
> Do you have a suggestion how can we workaround it ?
> 
> We have a production system running with Flink and it is mandatory to add
> one more field in the state.
> 
> Maybe some how we can write our own serializer?
> 
> Thanks,
> Konstantin
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: State serialization problem when we add a new field in the object

2018-03-14 Thread Fabian Hueske
Hi,

Flink supports upgrading of serializers [1] [2] since version 1.3.
You probably need to upgrade to Flink 1.3 before you can use the feature.

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/custom_serialization.html
[2] https://issues.apache.org/jira/browse/FLINK-6178

2018-03-14 10:03 GMT+01:00 kla :

> Hi Aljoscha,
>
> Thanks for your reply.
>
> Do you have a suggestion how can we workaround it ?
>
> We have a production system running with Flink and it is mandatory to add
> one more field in the state.
>
> Maybe some how we can write our own serializer?
>
> Thanks,
> Konstantin
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


Re: Can i use Avro serializer as default instead of kryo serializer in Flink for scala API ?

2018-03-14 Thread shashank agarwal
Hi,

when I forcefully enable the Avro. That code doesn't work cause there are
some dependencies in Flink-CEP library which needs Generic serializer also.
So I have a question again?

We are using Scala for Flink program we need evolution schema support for
our manage state, Cause variable changes in our models.

Should Case classes supports that or we have to use Avro?

Should we generate and use our model classes from Avros chema code
generator?

Avro generates Java classes is it a problem to use in our scala program and
scala streams?

Is avro4s supported by flink?



‌

On Mon, Sep 25, 2017 at 8:24 PM, Nico Kruber  wrote:

> Hi Shashank,
> enabling Avro as the default de/serializer for Flink should be as simple as
> the following, according to [1]
>
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.getConfig.enableForceAvro()
>
> I am, however, no expert on this and the implications regarding the use of
> Avro from inside Scala, so I included Gordon (cc'd) who may know more.
>
>
>
> Nico
>
>
> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/
> types_serialization.html
>
> On Saturday, 23 September 2017 10:11:28 CEST shashank agarwal wrote:
> > Hello Team,
> >
> > As our schema evolves due to business logics. We want to use expendable
> > schema like Avro as default serializer and deserializer for flink program
> > and states.
> >
> > My doubt is, We are using Scala API in our flink program, But Avro
> default
> > supports Java POJO. So how we can use this in our scala APi should we
> have
> > to use serializer like Avro4s ? Or we can use default Avro in our Scala
> > flink app than what will be the steps ?
> >
> > Please guide.
>
>
>


-- 
Thanks Regards

SHASHANK AGARWAL
 ---  Trying to mobilize the things


Re: Global Window, Trigger and Watermarks, Parallelism

2018-03-14 Thread dim5b
I see you replied on 

https://stackoverflow.com/questions/48028061/flink-cep-greedy-matching

with a known bug issue on the 

https://issues.apache.org/jira/browse/FLINK-8914

In my case my pattern looks like 

Pattern tripPattern =
   
Pattern.begin("start").times(1).where(START_CONDITION)
   
.followedBy("middle").where(MIDDLE_CONDITION).oneOrMore()
.next("end").where(END_CONDITION);

the end result being..

1> 1,2,3,4,
4> 1,2,3,6,7,8,11,12,
1> 5,6,7,8,11,12,
3> 5,6,7,8,9,
2> 1,2,3,6,7,8,9,
2> 10,11,12,

By using next operator  I limit the end-terminating side but begin takes all
possible solutions.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


state backend configuration

2018-03-14 Thread Jayant Ameta
Hi,
Can I enable incremental checkpoint for rocksdb via flink-conf.yaml file?

Also, I can set state.backend via the conf file as well as in my code using
env.setStateBackend(). Which of these take precedence?

Jayant Ameta


Re: State serialization problem when we add a new field in the object

2018-03-14 Thread kla
Hi Aljoscha,

Thanks for your reply.

Do you have a suggestion how can we workaround it ?

We have a production system running with Flink and it is mandatory to add
one more field in the state.

Maybe some how we can write our own serializer?

Thanks,
Konstantin



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Extremely large job serialization produced by union operator

2018-03-14 Thread Fabian Hueske
Can you share the operator plan
(StreamExecutionEnvironment.getExecutionPlan()) for both cases?

Thanks, Fabian

2018-03-14 9:08 GMT+01:00 杨力 :

> I understand complex SQL queries would be translated into large DAGs.
> However, the submission succeeds in my case if I don't use union operator.
> It might be a potential bug related to it. For example, following code
> submisses successfully with the default limitations of akka.framesize.
>
> val sqls: Seq[String] = ...
> val sink: JDBCAppendTableSink = ...
>
> sqls foreach {
>   sql =>
> val table = tEnv.sqlQuery(sql)
> val outputStream = tEnv.toAppendStream[Row](table) map {
>   ...
> }
> tEnv.fromDataStream(outputStream).writeToSink(sink)
> }
>
> If I union these outputStreams and send it to a single sink, the size of
> serialized job will be 100 MB.
>
> val outputStream = sqls map {
>   sql =>
> val table = tEnv.sqlQuery(sql)
> tEnv.toAppendStream[Row](table) map {
>   ...
> }
> } reduce {
>   (a, b) => a union b
> }
> tEnv.fromDataStream(outputStream).writeToSink(sink)
>
> I failed to reproduce it without actually used table schemas and SQL
> queries in my production. And at last I wrote my own JDBC sink with
> connection pooling to migrate this problem. Maybe someone familiar with the
> implementation of union operator would figure out what's going wrong.
>
> Fabian Hueske  于 2018年3月13日周二 下午11:42写道:
>
>> Hi Bill,
>>
>> The size of the program depends on the number and complexity SQL queries
>> that you are submitting.
>> Each query might be translated into a sequence of multiple operators.
>> Each operator has a string with generated code that will be compiled on the
>> worker nodes. The size of the code depends on the number of fields in the
>> schema.
>> Operators and code are not shared across queries.
>>
>> Best, Fabian
>>
>> 2018-03-09 23:36 GMT+01:00 杨力 :
>>
>>> Thank you for your response. It occurs both in a standalone cluster anda
>>> a yarn-cluster. I am trying to remove business code and reproduce it with a
>>> minimal demo.
>>>
>>>
>>> On Sat, Mar 10, 2018 at 2:27 AM Piotr Nowojski 
>>> wrote:
>>>
 Hi,

 Could you provide more details about your queries and setup? Logs could
 be helpful as well.

 Piotrek

 > On 9 Mar 2018, at 11:00, 杨力  wrote:
 >
 > I wrote a flink-sql app with following topography.
 >
 > KafkaJsonTableSource -> SQL -> toAppendStream -> Map ->
 JDBCAppendTableSink
 > KafkaJsonTableSource -> SQL -> toAppendStream -> Map ->
 JDBCAppendTableSink
 > ...
 > KafkaJsonTableSource -> SQL -> toAppendStream -> Map ->
 JDBCAppendTableSink
 >
 > I have a dozen of TableSources And tens of SQLs. As a result, the
 number of JDBCAppendTableSink times parallelism, that is the number of
 concurrent connections to database, is too large for the database server to
 handle. So I tried union DataStreams before connecting them to the
 TableSink.
 >
 > KafkaJsonTableSource -> SQL -> toAppendStream -> Map
 > \
 > KafkaJsonTableSource -> SQL -> toAppendStream -> Map --- union ->
 JDBCAppendTableSink
 > ... /
 > KafkaJsonTableSource -> SQL -> toAppendStream -> Map
 >
 > With this strategy, job submission failed with an
 OversizedPayloadException of 104 MB. Increasing akka.framesize helps to
 avoid this exception, but job submission hangs and times out.
 >
 > I can't understand why a simple union operator would serialize to
 such a large message. Can I avoid this problem?
 > Or can I change some configuration to fix the submission time out?
 >
 > Regards,
 > Bill


>>


Re: Extremely large job serialization produced by union operator

2018-03-14 Thread 杨力
I understand complex SQL queries would be translated into large DAGs.
However, the submission succeeds in my case if I don't use union operator.
It might be a potential bug related to it. For example, following code
submisses successfully with the default limitations of akka.framesize.

val sqls: Seq[String] = ...
val sink: JDBCAppendTableSink = ...

sqls foreach {
  sql =>
val table = tEnv.sqlQuery(sql)
val outputStream = tEnv.toAppendStream[Row](table) map {
  ...
}
tEnv.fromDataStream(outputStream).writeToSink(sink)
}

If I union these outputStreams and send it to a single sink, the size of
serialized job will be 100 MB.

val outputStream = sqls map {
  sql =>
val table = tEnv.sqlQuery(sql)
tEnv.toAppendStream[Row](table) map {
  ...
}
} reduce {
  (a, b) => a union b
}
tEnv.fromDataStream(outputStream).writeToSink(sink)

I failed to reproduce it without actually used table schemas and SQL
queries in my production. And at last I wrote my own JDBC sink with
connection pooling to migrate this problem. Maybe someone familiar with the
implementation of union operator would figure out what's going wrong.

Fabian Hueske  于 2018年3月13日周二 下午11:42写道:

> Hi Bill,
>
> The size of the program depends on the number and complexity SQL queries
> that you are submitting.
> Each query might be translated into a sequence of multiple operators. Each
> operator has a string with generated code that will be compiled on the
> worker nodes. The size of the code depends on the number of fields in the
> schema.
> Operators and code are not shared across queries.
>
> Best, Fabian
>
> 2018-03-09 23:36 GMT+01:00 杨力 :
>
>> Thank you for your response. It occurs both in a standalone cluster anda
>> a yarn-cluster. I am trying to remove business code and reproduce it with a
>> minimal demo.
>>
>>
>> On Sat, Mar 10, 2018 at 2:27 AM Piotr Nowojski 
>> wrote:
>>
>>> Hi,
>>>
>>> Could you provide more details about your queries and setup? Logs could
>>> be helpful as well.
>>>
>>> Piotrek
>>>
>>> > On 9 Mar 2018, at 11:00, 杨力  wrote:
>>> >
>>> > I wrote a flink-sql app with following topography.
>>> >
>>> > KafkaJsonTableSource -> SQL -> toAppendStream -> Map ->
>>> JDBCAppendTableSink
>>> > KafkaJsonTableSource -> SQL -> toAppendStream -> Map ->
>>> JDBCAppendTableSink
>>> > ...
>>> > KafkaJsonTableSource -> SQL -> toAppendStream -> Map ->
>>> JDBCAppendTableSink
>>> >
>>> > I have a dozen of TableSources And tens of SQLs. As a result, the
>>> number of JDBCAppendTableSink times parallelism, that is the number of
>>> concurrent connections to database, is too large for the database server to
>>> handle. So I tried union DataStreams before connecting them to the
>>> TableSink.
>>> >
>>> > KafkaJsonTableSource -> SQL -> toAppendStream -> Map
>>> > \
>>> > KafkaJsonTableSource -> SQL -> toAppendStream -> Map --- union ->
>>> JDBCAppendTableSink
>>> > ... /
>>> > KafkaJsonTableSource -> SQL -> toAppendStream -> Map
>>> >
>>> > With this strategy, job submission failed with an
>>> OversizedPayloadException of 104 MB. Increasing akka.framesize helps to
>>> avoid this exception, but job submission hangs and times out.
>>> >
>>> > I can't understand why a simple union operator would serialize to such
>>> a large message. Can I avoid this problem?
>>> > Or can I change some configuration to fix the submission time out?
>>> >
>>> > Regards,
>>> > Bill
>>>
>>>
>