Re: Breakage in Flink CLI in 1.5.0

2018-06-19 Thread Sampath Bhat
Hi Chesnay

Adding on to this point you made - " the rpc address is still *required *due
to some technical implementations; it may be that you can set this to some
arbitrary value however."

For job submission to happen successfully we should give specific rpc
address and not any arbitrary value. If any arbitrary value is given the
job submission fails with the following error -
org.apache.flink.client.deployment.ClusterRetrieveException: Couldn't
retrieve standalone cluster
at
org.apache.flink.client.deployment.StandaloneClusterDescriptor.retrieve(StandaloneClusterDescriptor.java:51)
at
org.apache.flink.client.deployment.StandaloneClusterDescriptor.retrieve(StandaloneClusterDescriptor.java:31)
at
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:249)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1020)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1096)
at
org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1096)
Caused by: java.net.UnknownHostException: flinktest-flink-jobmanager1233445:
Name or service not known
 (Random name flinktest-flink-jobmanager1233445)
at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method)
at java.net.InetAddress$2.lookupAllHostAddr(InetAddress.java:928)
at
java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1323)
at java.net.InetAddress.getAllByName0(InetAddress.java:1276)
at java.net.InetAddress.getAllByName(InetAddress.java:1192)
at java.net.InetAddress.getAllByName(InetAddress.java:1126)
at java.net.InetAddress.getByName(InetAddress.java:1076)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils.getRpcUrl(AkkaRpcServiceUtils.java:171)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils.getRpcUrl(AkkaRpcServiceUtils.java:136)
at
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:83)
at
org.apache.flink.client.program.ClusterClient.(ClusterClient.java:158)
at
org.apache.flink.client.program.rest.RestClusterClient.(RestClusterClient.java:184)
at
org.apache.flink.client.program.rest.RestClusterClient.(RestClusterClient.java:157)
at
org.apache.flink.client.deployment.StandaloneClusterDescriptor.retrieve(StandaloneClusterDescriptor.java:49)
... 7 more


On Wed, Jun 20, 2018 at 11:18 AM, Sampath Bhat 
wrote:

> Hi Chesnay
>
> If REST API (i.e. the web server) is mandatory for submitting jobs then
> why is there an option to set rest.port to -1? I think it should be
> mandatory to set some valid port for rest.port and make sure flink job
> manager does not come up if valid port is not set for rest.port? Or else
> there must be some way to submit jobs even if REST API (i.e. the web
> server) is not instantiated.
>
> If jobmanger.rpc.address is not required for flink client then why is it
> still looking for that property in flink-conf.yaml? Isn't it not a bug?
> Because if we comment out the jobmanger.rpc.address and jobmanger.rpc.port
> then flink client will not be able to submit the job.
>
>
> On Tue, Jun 19, 2018 at 5:49 PM, Chesnay Schepler 
> wrote:
>
>> In 1.5 we reworked the job-submission to go through the REST API instead
>> of akka.
>>
>> I believe the jobmanager rpc port shouldn't be necessary anymore, the rpc
>> address is still *required *due to some technical implementations; it
>> may be that you can set this to some arbitrary value however.
>>
>> As a result the REST API (i.e. the web server) must be running in order
>> to submit jobs.
>>
>>
>> On 19.06.2018 14:12, Sampath Bhat wrote:
>>
>> Hello
>>
>> I'm using Flink 1.5.0 version and Flink CLI to submit jar to flink
>> cluster.
>>
>> In flink 1.4.2 only job manager rpc address and job manager rpc port were
>> sufficient for flink client to connect to job manager and submit the job.
>>
>> But in flink 1.5.0 the flink client additionally requires the
>> rest.address and rest.port for submitting the job to job manager. What is
>> the advantage of this new method over the 1.4.2 method of submitting job?
>>
>> Moreover if we make rest.port = -1 the web server will not be
>> instantiated then how should we submit the job?
>>
>> Regards
>> Sampath
>>
>>
>>
>


Re: Breakage in Flink CLI in 1.5.0

2018-06-19 Thread Sampath Bhat
Hi Chesnay

If REST API (i.e. the web server) is mandatory for submitting jobs then why
is there an option to set rest.port to -1? I think it should be mandatory
to set some valid port for rest.port and make sure flink job manager does
not come up if valid port is not set for rest.port? Or else there must be
some way to submit jobs even if REST API (i.e. the web server) is not
instantiated.

If jobmanger.rpc.address is not required for flink client then why is it
still looking for that property in flink-conf.yaml? Isn't it not a bug?
Because if we comment out the jobmanger.rpc.address and jobmanger.rpc.port
then flink client will not be able to submit the job.


On Tue, Jun 19, 2018 at 5:49 PM, Chesnay Schepler 
wrote:

> In 1.5 we reworked the job-submission to go through the REST API instead
> of akka.
>
> I believe the jobmanager rpc port shouldn't be necessary anymore, the rpc
> address is still *required *due to some technical implementations; it may
> be that you can set this to some arbitrary value however.
>
> As a result the REST API (i.e. the web server) must be running in order to
> submit jobs.
>
>
> On 19.06.2018 14:12, Sampath Bhat wrote:
>
> Hello
>
> I'm using Flink 1.5.0 version and Flink CLI to submit jar to flink cluster.
>
> In flink 1.4.2 only job manager rpc address and job manager rpc port were
> sufficient for flink client to connect to job manager and submit the job.
>
> But in flink 1.5.0 the flink client additionally requires the rest.address
> and rest.port for submitting the job to job manager. What is the advantage
> of this new method over the 1.4.2 method of submitting job?
>
> Moreover if we make rest.port = -1 the web server will not be instantiated
> then how should we submit the job?
>
> Regards
> Sampath
>
>
>


Re: Questions regarding to Flink 1.5.0 REST API change

2018-06-19 Thread Siew Wai Yow
Hi all,


Seems pass in target-directory is a must now for checkpoints REST API, and the 
status will not response with save point directory anymore. I can pass in but 
the information is redundant with the same already defined in 
flink-config.yaml. May I know is there a way to retrieve the save point 
directory from flink-config.yaml in flink application? I am not able to get it 
from env.getConfig(). Thank you.



From: Chesnay Schepler 
Sent: Tuesday, June 19, 2018 11:55 PM
To: user@flink.apache.org
Subject: Re: Questions regarding to Flink 1.5.0 REST API change

1. PATCH to /jobs/:jobid, you can specify CANCEL/STOP with the "mode" query 
parameter

2. POST to /jobs/:jobid/savepoints, with a json payload. Returns a trigger id, 
used for 3).
{
"target-directory" : {

  "type" : "string"
},
"cancel-job" : {
  "type" : "boolean"
}
}


3. GET to /jobs/:jobid/savepoints/:triggerid

On 19.06.2018 17:40, Esteban Serrano wrote:
For #1, you need to use a PATCH request to "/jobs/:jobid"

On Tue, Jun 19, 2018 at 11:35 AM Siew Wai Yow 
mailto:wai_...@hotmail.com>> wrote:

Hi,


Regarding to Flink 1.5.0 REST API breaking change,

  *   The REST API to cancel a job was changed.
  *   The REST API to cancel a job with savepoint was changed.

I have few dump questions,


  1.  Any replacement for cancellation ONLY without save-point? Only found 
"/jobs/:jobid/savepoints".
  2.  For "/jobs/:jobid/savepoints", how could I form the URL with cancellation 
and with directory afterward?
 *   http://192.168.56.151:8081/jobs//savepoints/cancel-job/true??
  3.  Any cancellation progress monitoring in 1.5 like previous version 1.3/1.4?
 *   previous version: 
/jobs/:jobid/cancel-with-savepoint/in-progress/:requestId

Thank you.

Regards,
Yow





Re:How to get past "bad" Kafka message, restart, keep state

2018-06-19 Thread sihua zhou
Hi,


Flink will reset the kafka offset to the latest successful checkpoint when 
recovery, but the "bad" message will always raise exception and cause recovery, 
so it will never be covered by any successful checkpoint, and your job will 
never skip the record that "bad" message.


I think you may need to use the try-catch block to handle the exception in 
{{parseData(input)}} yourself, maybe as follow.


{code}
try {
String[] tokens = record.toLowerCase().split(",");


// Get Key
String key = tokens[0];

// Get Integer Value
String integerValue = tokens[1];
System.out.println("Trying to Parse=" + integerValue);
Integer value = Integer.parseInt(integerValue);

// Build TupleBoundedOutOfOrdernessGenerator
return new Tuple2(key, value);
} catch(...) {
return new Tuple2(key, 0);
}
{code}


Best, Sihua
On 06/20/2018 08:57,chrisr123 wrote:
First time I'm trying to get this to work so bear with me. I'm trying to
learn checkpointing with Kafka and handling "bad" messages, restarting
without losing state.

Use Case:
Use checkpointing.
Read a stream of integers from Kafka, keep a running sum.
If a "bad" Kafka message read, restart app, skip the "bad" message, keep
state.
My stream would something look like this:

set1,5
set1,7
set1,foobar
set1,6

I want my app to keep a running sum of the integers it has seen, and restart
if it crashes without losing state. so my running sum would be:
5,
12,
app crashes and restarts
18

However, I'm finding when my app restarts, it keeps reading the bad "foobar"
message and doesnt get past it. Source code below. The mapper bombs when I
try to parse "foobar" as an Integer.
How can I modify app to get past "poison" message?

env.enableCheckpointing(1000L);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500L);
env.getCheckpointConfig().setCheckpointTimeout(1);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.setStateBackend(new
FsStateBackend("hdfs://mymachine:9000/flink/checkpoints"));

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", BROKERS);
properties.setProperty("zookeeper.connect", ZOOKEEPER_HOST);
properties.setProperty("group.id", "consumerGroup1");

FlinkKafkaConsumer08 kafkaConsumer = new FlinkKafkaConsumer08<>(topicName,
new SimpleStringSchema(), properties);
DataStream messageStream = env.addSource(kafkaConsumer);

DataStream> sums = messageStream
.map(new NumberMapper())
.keyBy(0)
.sum(1);
sums.print();


private static class NumberMapper implements
MapFunction> {
public Tuple2 map(String input) throws Exception {
return parseData(input);
}

private Tuple2 parseData(String record) {

String[] tokens = record.toLowerCase().split(",");

// Get Key
String key = tokens[0];

// Get Integer Value
String integerValue = tokens[1];
System.out.println("Trying to Parse=" + integerValue);
Integer value = Integer.parseInt(integerValue);

// Build TupleBoundedOutOfOrdernessGenerator
return new Tuple2(key, value);
}

}




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Questions regarding to Flink 1.5.0 REST API change

2018-06-19 Thread Siew Wai Yow
Thank you @Chesnay Schepler and @Esteban 
Serrano!


From: Chesnay Schepler 
Sent: Tuesday, June 19, 2018 11:55 PM
To: user@flink.apache.org
Subject: Re: Questions regarding to Flink 1.5.0 REST API change

1. PATCH to /jobs/:jobid, you can specify CANCEL/STOP with the "mode" query 
parameter

2. POST to /jobs/:jobid/savepoints, with a json payload. Returns a trigger id, 
used for 3).
{
"target-directory" : {

  "type" : "string"
},
"cancel-job" : {
  "type" : "boolean"
}
}


3. GET to /jobs/:jobid/savepoints/:triggerid

On 19.06.2018 17:40, Esteban Serrano wrote:
For #1, you need to use a PATCH request to "/jobs/:jobid"

On Tue, Jun 19, 2018 at 11:35 AM Siew Wai Yow 
mailto:wai_...@hotmail.com>> wrote:

Hi,


Regarding to Flink 1.5.0 REST API breaking change,

  *   The REST API to cancel a job was changed.
  *   The REST API to cancel a job with savepoint was changed.

I have few dump questions,


  1.  Any replacement for cancellation ONLY without save-point? Only found 
"/jobs/:jobid/savepoints".
  2.  For "/jobs/:jobid/savepoints", how could I form the URL with cancellation 
and with directory afterward?
 *   http://192.168.56.151:8081/jobs//savepoints/cancel-job/true??
  3.  Any cancellation progress monitoring in 1.5 like previous version 1.3/1.4?
 *   previous version: 
/jobs/:jobid/cancel-with-savepoint/in-progress/:requestId

Thank you.

Regards,
Yow





How to get past "bad" Kafka message, restart, keep state

2018-06-19 Thread chrisr123
First time I'm trying to get this to work so bear with me. I'm trying to
learn checkpointing with Kafka and handling "bad" messages, restarting
without losing state.

Use Case:
Use checkpointing.
Read a stream of integers from Kafka, keep a running sum.
If a "bad" Kafka message read, restart app, skip the "bad" message, keep
state.
My stream would something look like this:

set1,5
set1,7
set1,foobar
set1,6

I want my app to keep a running sum of the integers it has seen, and restart
if it crashes without losing state. so my running sum would be:
5,
12,
app crashes and restarts
18

However, I'm finding when my app restarts, it keeps reading the bad "foobar"
message and doesnt get past it. Source code below. The mapper bombs when I
try to parse "foobar" as an Integer.
How can I modify app to get past "poison" message?

env.enableCheckpointing(1000L);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500L);
env.getCheckpointConfig().setCheckpointTimeout(1);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.setStateBackend(new
FsStateBackend("hdfs://mymachine:9000/flink/checkpoints"));

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", BROKERS);
properties.setProperty("zookeeper.connect", ZOOKEEPER_HOST);
properties.setProperty("group.id", "consumerGroup1");

FlinkKafkaConsumer08 kafkaConsumer = new FlinkKafkaConsumer08<>(topicName,
new SimpleStringSchema(), properties);
DataStream messageStream = env.addSource(kafkaConsumer);

DataStream> sums = messageStream
  .map(new NumberMapper())
  .keyBy(0)
  .sum(1);  
  sums.print();


private static class NumberMapper implements
MapFunction> {
public Tuple2 map(String input) throws 
Exception {
return parseData(input);
}

private Tuple2 parseData(String record) {

String[] tokens = record.toLowerCase().split(",");

// Get Key
String key = tokens[0];

// Get Integer Value
String integerValue = tokens[1];
System.out.println("Trying to Parse=" + integerValue);
Integer value = Integer.parseInt(integerValue);

// Build TupleBoundedOutOfOrdernessGenerator
return new Tuple2(key, value);
}

}




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Memory Leak in ProcessingTimeSessionWindow

2018-06-19 Thread ashish pok
 All, 
I took a few heap dumps (when app restarts and at 2 hour intervals) using jmap, 
they are 5GB to 8GB. I did some compares and what I can see is heap shows data 
tuples (basically instances of object that is maintained as states) counts 
going up slowly. 
Only thing I could possibly relate that to were 
streaming.api.operators.InternalTimer and 
streaming.api.windowing.windows.TimeWindow both were trending up as well. There 
are definitely lot more windows created than the increments I could notice but 
nevertheless those objects are trending up. Input stream has a very consistent 
sin wave throughput. So it really doesn't make sense for windows and tuples to 
keep trending up. There is also no event storm or anything of that sort (ie. 
source stream has been very steady as far as throughput is concerned).
Here is a plot of heap utilization:

So it has a typical sin wave pattern which is definitely expected as input 
stream has the same pattern but source doesnt have a trend upwards like heap 
utilization shown above. Screenshot above is showing spike from 60% utilization 
to 80% and trend keeps going up until an issue occurs that resets the app.
Since processing is based on ProcessingTime, I really would have expected 
memory to reach a steady state and remain sort of flat from a trending 
perspective. 
Appreciate any pointers anyone might have.
Thanks, Ashish
On Monday, June 18, 2018, 12:54:03 PM EDT, ashish pok  
wrote:  
 
 Right, thats where I am headed now but was wondering there are any “gochas” I 
am missing before I try and dig into a few gigs of heap dump. 

Thanks, Ashish

Sent from Yahoo Mail for iPhone


On Monday, June 18, 2018, 3:37 AM, Stefan Richter  
wrote:

Hi,
can you take a heap dump from a JVM that runs into the problem and share it 
with us? That would make finding the cause a lot easier.
Best,Stefan


Am 15.06.2018 um 23:01 schrieb ashish pok :
All,
I have another slow Memory Leak situation using basic TimeSession Window 
(earlier it was GlobalWindow related that Fabian helped clarify). 
I have a very simple data pipeline:
DataStream processedData = rawTuples 
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(AppConfigs.getWindowSize(780
  .trigger(new ProcessingTimePurgeTrigger()) .apply(new IPSLAMetricWindowFn()) 
.name("windowFunctionTuple") .map(new TupleToPlatformEventMapFn()) 
.name("mapTupleEvent") ; 
I initially didnt even have ProcessingTmePurgeTrigger and it was using default 
Trigger. In an effort to fix this issue, I created my own Trigger from default 
ProcessingTimeTrigger with simple override to onProcessingTime method 
(essentially replacing FIRE with FIRE_AND_PURGE)
@Override public TriggerResult onProcessingTime(long time, 
TimeWindow window, TriggerContext ctx) { return TriggerResult.FIRE_AND_PURGE; }
This seems to have done nothing (may have delayed issue by couple of hours - 
not certain). But, I still see heap utilization creep up slowly and eventually 
reaches a point when GC starts to take too long and then the dreaded OOM. 
For completeness here is my Window Function (still using old function 
interface). It creates few metrics for reporting and applies logic by looping 
over the Iterable. NO states are explicitly kept in this function, needed 
RichWindowFunction to generate metrics basically.


public class IPSLAMetricWindowFn extends RichWindowFunction {




 private static final long serialVersionUID = 1L;

 

 private static Logger logger = 
LoggerFactory.getLogger(IPSLAMetricWindowFn.class);

 

 private Meter in;

 

 private Meter out;




 private Meter error;

 

 @Override

 public void open(Configuration conf) throws Exception {

     this.in = getRuntimeContext()

          .getMetricGroup()

          .addGroup(AppConstants.APP_METRICS.PROCESS)

          .meter(AppConstants.APP_METRICS.IN, new 
MeterView(AppConstants.APP_METRICS.INTERVAL_30));

     this.out = getRuntimeContext()

          .getMetricGroup()

          .addGroup(AppConstants.APP_METRICS.PROCESS)

          .meter(AppConstants.APP_METRICS.OUT, new 
MeterView(AppConstants.APP_METRICS.INTERVAL_30));

     this.error = getRuntimeContext()

          .getMetricGroup()

          .addGroup(AppConstants.APP_METRICS.PROCESS)

          .meter(AppConstants.APP_METRICS.ERROR, new 
MeterView(AppConstants.APP_METRICS.INTERVAL_30));

 super.open(conf);

 }




 @Override

 public void apply(String key, TimeWindow window, Iterable 
events, Collector collector) throws Exception {

 }

}



Appreciate any pointers on what could be causing leaks here. This seems pretty 
straight-forward.
Thanks, Ashish





  

Strictly use TLSv1.2

2018-06-19 Thread Vinay Patil
Hi,

I have deployed Flink 1.3.2 and enabled SSL settings. From the ssl debug
logs it shows that Flink is using TLSv1.2. However based on the security
scans we have observed that it also allows TLSv1.0 and TLSv1.1.
 
In order to strictly use TLSv1.2 we have updated the following property of
java.security file:
jdk.tls.disabledAlgorithms=MD5, SSLv3, DSA, RSA keySize < 2048, TLSv1,
TLSv1.1

But still it allows TLSv1.1 , verified this by hitting the following command
from master node:

openssl s_client -connect taskmanager1: -tls1

(here listening_address_port is part of
akka.ssl.tcp://flink@taskmanager1:port/user/taskmanager)

Now, when I hit the above command for the data port, it does not allow
TLSv1.1 and only allows TLSv1.2

Can you please let me know how can I enforce all the flink ports to use
TLSv1.2.

Regards,
Vinay Patil



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Savepoint with S3

2018-06-19 Thread Anil
I'm using RocksDB and S3 to for Savepoint. Flink version is 1.4. Currently
I'm creating a savepoint for the jobs every 10 mins.  When the job starts
the data is saved as expected but after a while I see these message in my
log and the savepoint is not saved anymore. 

2018-06-19 12:23:18,992 [flink-akka.actor.default-dispatcher-19] WARN 
httpclient.RestStorageService (RestStorageService.java:performRequest(434))
- *Error Response: HEAD
'/savepoint/849365e0-ab3c-4a37-9e30-53369a2bbbd2/sp/savepoint-869fa8-7b601cae0b4a'
-- ResponseCode: 404*, ResponseStatus: Not Found, Request Headers:
[Content-Type: , x-amz-request-payer: requester, x-amz-, User-Agent:
AWS-ElasticMapReduce, Host: dp-rill-prod.s3.amazonaws.com], Response
Headers: [x-amz-request-id: 9EC59912DD0FE206, Content-Type: application/xml,
Transfer-Encoding: chunked, Date: Tue, 19 Jun 2018 12:23:18 GMT, Server:
AmazonS3]

Each savepoint is saved as a separate folder in S3. The path for which the
response is 404, does exists in S3. I read  here on the forum

  
that the  directories are the "eventual consistent" part of S3. 

Is the error something that might be happening because of it. What would be
a workaround (decrease the savepoint frequency). Has anyone faced a similar
kind of issue with RocksDb and S3.  Will appreciate any help. 

Thanks in advance !



 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Ordering of stream from different kafka partitions

2018-06-19 Thread Andrey Zagrebin
Hi Amol,

I think you could try (based on your stack overflow code)
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
like this:

DataStream streamSource = env
   .addSource(kafkaConsumer)
   .setParallelism(4)
   .assignTimestampsAndWatermarks(
   new 
BoundedOutOfOrdernessTimestampExtractor(Time.milliseconds(3500)) {
   @Override
   public long extractTimestamp(Event element) {
   Map timeStamp = (Map) event.get("ts”);
   return (long) timeStamp.get("value");
   }
   });

In general, if records are sorted by anything in a Kafka partition, parallel 
subtask of Flink Kafka source will consume these records and push to user 
operators in the same order. There is maximum one consuming subtask per Kafka 
partition but several partitions might be served by one subtask. It means that 
there are the same guarantees as in Kafka: ordering per partition but not 
across them, including no global ordering. 

The case of global and per window ordering is already described by Sihua. The 
global ordering might be impractical in case of distributed system.

If a subtask of your Flink operator consumes from several partitions or there 
is no ordering at all, you can try the above approach with 
BoundedOutOfOrdernessTimestampExtractor to get approximate ordering across 
these partitions per key or all records. It is similar to ordering within a 
window. It means there could still be late records coming after out of 
orderness period of time which can break the ordering. This operator buffers 
records in state to maintain the order but only for out of orderness period of 
time which also increases latency.

Cheers,
Andrey

> On 19 Jun 2018, at 14:12, sihua zhou  wrote:
> 
> 
> 
> Hi Amol,
> 
> 
> I'm not sure whether this is impossible, especially when you need to operate 
> the record in multi parallelism. 
> 
> 
> IMO, in theroy, we can only get a ordered stream when there is a single 
> partition of kafka and operate it with a single parallelism in flink. Even in 
> this case, if you only want to order the records in a window, than you need 
> to store the records in the state, and order them when the window is 
> triggered. But if you want to order the records with a single 
> `keyBy()`(non-window), I think that's maybe impossible in practice, because 
> you need to store the all the incoming records and order the all data for 
> every incoming records, also you need to send retracted message for the 
> previous result(because every incoming record might change the global order 
> of the records).
> 
> 
> Best, Sihua
> On 06/19/2018 19:19,Amol S - iProgrammer wrote:
> Hi,
> 
> I have used flink streaming API in my application where the source of
> streaming is kafka. My kafka producer will publish data in ascending order
> of time in different partitions of kafka and consumer will read data from
> these partitions. However some kafka partitions may be slow due to some
> operation and produce late results. Is there any way to maintain order in
> this stream though the data arrive out of order. I have tried
> BoundedOutOfOrdernessTimestampExtractor but it didn't served the purpose.
> While digging this problem I came across your documentation (URL:
> https://cwiki.apache.org/confluence/display/FLINK/Time+and+Order+in+Streams)
> and tried to implement this but it didnt worked. I also tried with Table
> API order by but it seems you not support orderBy in flink 1.5 version.
> Please suggest me any workaround for this.
> 
> I have raised same concern on stack overflow
> 
> https://stackoverflow.com/questions/50904615/ordering-of-streams-while-reading-data-from-multiple-kafka-partitions
> 
> Thanks,
> 
> ---
> *Amol Suryawanshi*
> Java Developer
> am...@iprogrammer.com
> 
> 
> *iProgrammer Solutions Pvt. Ltd.*
> 
> 
> 
> *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society,
> Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - 411016,
> MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer*
> www.iprogrammer.com 
> 



Re: DataStreamCalcRule$1802" grows beyond 64 KB when execute long sql.

2018-06-19 Thread Timo Walther

Hi,

this is a known issue that is mentioned in 
https://issues.apache.org/jira/browse/FLINK-8921 and should be fixed 
soon. Currently, we only split by field but for your case we should also 
split expressions. As a workaround you could implement your own scalar 
UDF that contains the case/when logic.


Regards,
Timo


Am 19.06.18 um 07:27 schrieb zhangminglei:

Hi, friends.

When I execute a long sql and get the follow error, how can I have a 
quick fix ?


org.apache.flink.api.common.InvalidProgramException: Table program 
cannot be compiled. This is a bug. Please file an issue.
at 
org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36)
at 
org.apache.flink.table.runtime.CRowProcessRunner.compile(CRowProcessRunner.scala:35)
at 
org.apache.flink.table.runtime.CRowProcessRunner.open(CRowProcessRunner.scala:49)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at 
org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Compiling 
"DataStreamCalcRule$1802": Code of method 
"processElement(Ljava/lang/Object;Lorg/apache/flink/streaming/api/functions/ProcessFunction$Context;Lorg/apache/flink/util/Collector;)V" 
of class "DataStreamCalcRule$1802" grows beyond 64 KB

at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:361)

select case when cast(status as bigint) in (200) then 10 else 1 end as pv,\
case when cast(status as bigint) between 303 and 1000 then 1 when 
cast(status as bigint) between 100 and 199 then 1 else 0 end as 
service_fail,\
case when cast(status as bigint) in (200) then 10 when cast(status as 
bigint) in (301,302) then 1 when cast(status as bigint) between 201 
and 299 then 1 else 0 end as service_success,\
case when cast(status as bigint) not between 100 and 1000 then 1 else 
0 end as network_fail,\

qqiplib(ip, 'isp') as isp,\
case when response_time - request_time <= 6 then response_time - 
request_time else 0 end as response_t,\
case when response_time - request_time <= 6 then 1 else 0 end as 
count_in,\
case when host in 
('116.31.114.22','116.31.114.23','183.60.219.231','183.60.219.232','183.60.219.235','183.60.219.236','183.60.220.231','183.60.220.232',\

'183.60.219.247','183.60.219.248','183.60.219.243','183.60.219.244','183.60.219.251','183.60.219.252','116.31.114.202','116.31.114.204','116.31.114.206'\
,'116.31.114.208') then '佛山力通电信_SR' \
when host in 
('183.232.169.11','183.232.169.12','183.232.169.13','183.232.169.14','183.232.169.15','183.232.169.16','183.232.169.17','183.232.169.18') 
\

then '佛山力通移动_SR' \
when host in 
('112.93.112.11','112.93.112.12','112.93.112.13','112.93.112.14','112.93.112.15','112.93.112.16','112.93.112.17','112.93.112.18') 
\

then '佛山力通联通_SR' \
when host in 
('114.67.56.79','114.67.56.80','114.67.56.83','114.67.56.84','114.67.56.87','114.67.56.88','114.67.56.112','114.67.56.113','114.67.56.116',\

'114.67.56.117','114.67.60.214','114.67.60.215','114.67.54.111','114.67.54.112','114.67.56.95','114.67.56.96','114.67.54.12','114.67.54.13',\
'114.67.56.93') \
then '佛山力通BGP_SR' \
when host in 
('114.67.56.94','114.67.56.102','114.67.56.103','114.67.56.106','114.67.56.107','183.60.220.231','183.60.220.232','183.60.219.247',\
'183.60.219.248','114.67.60.201','114.67.60.203','114.67.60.205','114.67.60.207') 
\

then '佛山力通BGP_SR' \
when host in 
('183.240.167.24','183.240.167.25','183.240.167.26','183.240.167.27','183.240.167.28','183.240.167.29','183.240.167.30','183.240.167.31') 
\
then '佛山互联移动_SR' when host in 
('27.45.160.24','27.45.160.25','27.45.160.26','27.45.160.27','27.45.160.28','27.45.160.29','27.45.160.30','27.45.160.31') 
\

then '佛山互联联通_SR' \
when host in 
('43.255.228.11','43.255.228.12','43.255.228.13','43.255.228.14','43.255.228.15','43.255.228.16','43.255.228.17','43.255.228.18','43.255.228.19',\

'43.255.228.20','43.255.228.21','43.255.228.22','43.255.228.23') \
then '佛山互联BGP_SR' \
when host in 
('43.255.228.24','43.255.228.25','43.255.228.26','43.255.228.27','43.255.228.28','43.255.228.29','43.255.228.30','43.255.228.31','43.255.228.32',\

'43.255.228.33','43.255.228.34') \
then '佛山互联BGP_SR' \
when host in 
('14.17.91.113','14.17.91.114','14.17.72.11','14.17.72.12','14.17.72.15','14.17.72.16','14.17.72.19','14.17.72.20','183.61.170.121','183.61.170.122',\
'14.17.91.120','14.17.91.121','14.17.72.79','14.17.72.80','14.17.72.81','14.17.72.82') 
\

then '东莞电信_SR' \
when host in 
('221.228.213.94','221.228.213.95','221.228.213.68','221.228.213.69','221.228.213.76','221.228.213.77','221.228.213.98','221.2

Re: Questions regarding to Flink 1.5.0 REST API change

2018-06-19 Thread Chesnay Schepler
1. PATCH to /jobs/:jobid, you can specify CANCEL/STOP with the "mode" 
query parameter


2. POST to /jobs/:jobid/savepoints, with a json payload. Returns a 
trigger id, used for 3).

{|
"target-directory" : { |

|"type" : "string" }, "cancel-job" : { "type" : "boolean" } } |

3. GET to /jobs/:jobid/savepoints/:triggerid

On 19.06.2018 17:40, Esteban Serrano wrote:

For #1, you need to use a PATCH request to "*/jobs/:jobid*"

On Tue, Jun 19, 2018 at 11:35 AM Siew Wai Yow > wrote:


Hi,


Regarding to Flink 1.5.0 REST API breaking change,

  * /The REST API to cancel a job was changed./
  * /The REST API to cancel a job with savepoint was changed./

I have few dump questions,


 1. Any replacement for cancellation ONLY without save-point? Only
found "*/jobs/:jobid/savepoints*".
 2. For "*/jobs/:jobid/savepoints*", how could I form the URL with
cancellation and with directory afterward?
 1. http://192.168.56.151:8081/jobs//savepoints/cancel-job/true??
 3. Any cancellation progress monitoring in 1.5 like previous
version 1.3/1.4?
 1. previous version:
/jobs/:jobid/cancel-with-savepoint/in-progress/:requestId


Thank you.

Regards,
Yow






Re: Questions regarding to Flink 1.5.0 REST API change

2018-06-19 Thread Esteban Serrano
For #1, you need to use a PATCH request to "*/jobs/:jobid*"

On Tue, Jun 19, 2018 at 11:35 AM Siew Wai Yow  wrote:

> Hi,
>
>
> Regarding to Flink 1.5.0 REST API breaking change,
>
>- *The REST API to cancel a job was changed.*
>- *The REST API to cancel a job with savepoint was changed.*
>
> I have few dump questions,
>
>
>
>1. Any replacement for cancellation ONLY without save-point? Only
>found "*/jobs/:jobid/savepoints*".
>2. For "*/jobs/:jobid/savepoints*", how could I form the URL with
>cancellation and with directory afterward?
>   1. http://192.168.56.151:8081
>   /jobs//savepoints/cancel-job/true??
>3. Any cancellation progress monitoring in 1.5 like previous version
>1.3/1.4?
>   1. previous version:
>   /jobs/:jobid/cancel-with-savepoint/in-progress/:requestId
>
>
> Thank you.
>
> Regards,
> Yow
>
>
>


Questions regarding to Flink 1.5.0 REST API change

2018-06-19 Thread Siew Wai Yow
Hi,


Regarding to Flink 1.5.0 REST API breaking change,

  *   The REST API to cancel a job was changed.
  *   The REST API to cancel a job with savepoint was changed.

I have few dump questions,


  1.  Any replacement for cancellation ONLY without save-point? Only found 
"/jobs/:jobid/savepoints".
  2.  For "/jobs/:jobid/savepoints", how could I form the URL with cancellation 
and with directory afterward?
 *   http://192.168.56.151:8081/jobs//savepoints/cancel-job/true??
  3.  Any cancellation progress monitoring in 1.5 like previous version 1.3/1.4?
 *   previous version: 
/jobs/:jobid/cancel-with-savepoint/in-progress/:requestId

Thank you.

Regards,
Yow




Re: Running with Docker image on ECS/Fargate?

2018-06-19 Thread Sandybayev, Turar (CAI - Atlanta)
Sorry, sent to a wrong group originally. My question is below:

Date: Tuesday, June 19, 2018 at 11:01 AM
To: "d...@flink.apache.org" 
Subject: Running with Docker image on ECS/Fargate?

Hi,

Has anyone tried to run a Flink cluster on AWS ECS? I couldn’t figure how to 
replicate “docker-compose scale taskmanager=4” with ecs-cli, and looking at: 
https://github.com/aws/amazon-ecs-cli/issues/166 , seems like it’s impossible 
at the moment?

If above is true, is it generally recommended to use Kubernetes/Helm even on 
AWS instead?

Thanks,
Turar


Re: DataStreamCalcRule$1802" grows beyond 64 KB when execute long sql.

2018-06-19 Thread Fabian Hueske
I see, then this case wasn't covered by the fix that we added for Flink
1.5.0.
I guess the problem is that the code is needed to evaluate a single field.

Implementing a scalar user-function is not very difficult [1].
However, you need to register it in the TableEnvironment before you can use
it in a SQL query.

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/udfs.html#scalar-functions

2018-06-19 16:46 GMT+02:00 zhangminglei <18717838...@163.com>:

> Hi, Fabian, Absolutely, Flink 1.5.0 I am using for this. A big CASE WHEN
> statement. Is it hard to implement ? I am a new to flink table api & sql.
>
> Best Minglei.
>
> 在 2018年6月19日,下午10:36,Fabian Hueske  写道:
>
> Hi,
>
> Which version are you using? We fixed a similar issue for Flink 1.5.0.
> If you can't upgrade yet, you can also implement a user-defined function
> that evaluates the big CASE WHEN statement.
>
> Best, Fabian
>
> 2018-06-19 16:27 GMT+02:00 zhangminglei <18717838...@163.com>:
>
>> Hi, friends.
>>
>> When I execute a long sql and get the follow error, how can I have a
>> quick fix ?
>>
>> org.apache.flink.api.common.InvalidProgramException: Table program
>> cannot be compiled. This is a bug. Please file an issue.
>> at org.apache.flink.table.codegen.Compiler$class.compile(
>> Compiler.scala:36)
>> at org.apache.flink.table.runtime.CRowProcessRunner.compile(
>> CRowProcessRunner.scala:35)
>> at org.apache.flink.table.runtime.CRowProcessRunner.open(
>> CRowProcessRunner.scala:49)
>> at org.apache.flink.api.common.functions.util.FunctionUtils.ope
>> nFunction(FunctionUtils.java:36)
>> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOp
>> erator.open(AbstractUdfStreamOperator.java:102)
>> at org.apache.flink.streaming.api.operators.ProcessOperator.ope
>> n(ProcessOperator.java:56)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllO
>> perators(StreamTask.java:393)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>> StreamTask.java:254)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.RuntimeException: Compiling
>> "DataStreamCalcRule$1802": Code of method "processElement(Ljava/lang/Obj
>> ect;Lorg/apache/flink/streaming/api/functions/ProcessFunctio
>> n$Context;Lorg/apache/flink/util/Collector;)V" of class
>> "DataStreamCalcRule$1802" grows beyond 64 KB
>> at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.
>> java:361)
>>
>> select case when cast(status as bigint) in (200) then 10 else 1 end as
>> pv,\
>> case when cast(status as bigint) between 303 and 1000 then 1 when
>> cast(status as bigint) between 100 and 199 then 1 else 0 end as
>> service_fail,\
>> case when cast(status as bigint) in (200) then 10 when cast(status as
>> bigint) in (301,302) then 1 when cast(status as bigint) between 201 and 299
>> then 1 else 0 end as service_success,\
>> case when cast(status as bigint) not between 100 and 1000 then 1 else 0
>> end as network_fail,\
>> qqiplib(ip, 'isp') as isp,\
>> case when response_time - request_time <= 6 then response_time -
>> request_time else 0 end as response_t,\
>> case when response_time - request_time <= 6 then 1 else 0 end as
>> count_in,\
>> case when host in ('116.31.114.22','116.31.114.2
>> 3','183.60.219.231','183.60.219.232','183.60.219.235','183.
>> 60.219.236','183.60.220.231','183.60.220.232',\
>> '183.60.219.247','183.60.219.248','183.60.219.243','183.60.2
>> 19.244','183.60.219.251','183.60.219.252','116.31.114.202','
>> 116.31.114.204','116.31.114.206'\
>> ,'116.31.114.208') then '佛山力通电信_SR' \
>> when host in ('183.232.169.11','183.232.169.12','183.232.169.13','183.232
>> .169.14','183.232.169.15','183.232.169.16','183.232.169.17','183.232.169.18')
>> \
>> then '佛山力通移动_SR' \
>> when host in ('112.93.112.11','112.93.112.12','112.93.112.13','112.93.112
>> .14','112.93.112.15','112.93.112.16','112.93.112.17','112.93.112.18') \
>> then '佛山力通联通_SR' \
>> when host in ('114.67.56.79','114.67.56.80','114.67.56.83','114.67.56.84'
>> ,'114.67.56.87','114.67.56.88','114.67.56.112','114.67.56.11
>> 3','114.67.56.116',\
>> '114.67.56.117','114.67.60.214','114.67.60.215','114.67.54.
>> 111','114.67.54.112','114.67.56.95','114.67.56.96','114.67.
>> 54.12','114.67.54.13',\
>> '114.67.56.93') \
>> then '佛山力通BGP_SR' \
>> when host in ('114.67.56.94','114.67.56.102','114.67.56.103','114.67.56.
>> 106','114.67.56.107','183.60.220.231','183.60.220.232','183.60.219.247',\
>> '183.60.219.248','114.67.60.201','114.67.60.203','114.67.60.205','114.67.60.207')
>> \
>> then '佛山力通BGP_SR' \
>> when host in ('183.240.167.24','183.240.167.25','183.240.167.26','183.240
>> .167.27','183.240.167.28','183.240.167.29','183.240.167.30','183.240.167.31')
>> \
>> then '佛山互联移动_SR' when host in ('27.45.160.24','27.45.160.25'
>> ,'27.45.160.26','27.45.160.27','27.45.160.28','27.45.16

Re: DataStreamCalcRule$1802" grows beyond 64 KB when execute long sql.

2018-06-19 Thread zhangminglei
Hi, Fabian, Absolutely, Flink 1.5.0 I am using for this. A big CASE WHEN 
statement. Is it hard to implement ? I am a new to flink table api & sql.

Best Minglei.

> 在 2018年6月19日,下午10:36,Fabian Hueske  写道:
> 
> Hi,
> 
> Which version are you using? We fixed a similar issue for Flink 1.5.0.
> If you can't upgrade yet, you can also implement a user-defined function that 
> evaluates the big CASE WHEN statement.
> 
> Best, Fabian
> 
> 2018-06-19 16:27 GMT+02:00 zhangminglei <18717838...@163.com 
> >:
> Hi, friends.
> 
> When I execute a long sql and get the follow error, how can I have a quick 
> fix ?
> 
> org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
> compiled. This is a bug. Please file an issue.
> at 
> org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36)
> at 
> org.apache.flink.table.runtime.CRowProcessRunner.compile(CRowProcessRunner.scala:35)
> at 
> org.apache.flink.table.runtime.CRowProcessRunner.open(CRowProcessRunner.scala:49)
> at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> at 
> org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.RuntimeException: Compiling "DataStreamCalcRule$1802": 
> Code of method 
> "processElement(Ljava/lang/Object;Lorg/apache/flink/streaming/api/functions/ProcessFunction$Context;Lorg/apache/flink/util/Collector;)V"
>  of class "DataStreamCalcRule$1802" grows beyond 64 KB
> at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:361)
> 
> select case when cast(status as bigint) in (200) then 10 else 1 end as pv,\
> case when cast(status as bigint) between 303 and 1000 then 1 when cast(status 
> as bigint) between 100 and 199 then 1 else 0 end as service_fail,\
> case when cast(status as bigint) in (200) then 10 when cast(status as bigint) 
> in (301,302) then 1 when cast(status as bigint) between 201 and 299 then 1 
> else 0 end as service_success,\
> case when cast(status as bigint) not between 100 and 1000 then 1 else 0 end 
> as network_fail,\
> qqiplib(ip, 'isp') as isp,\
> case when response_time - request_time <= 6 then response_time - 
> request_time else 0 end as response_t,\
> case when response_time - request_time <= 6 then 1 else 0 end as 
> count_in,\
> case when host in 
> ('116.31.114.22','116.31.114.23','183.60.219.231','183.60.219.232','183.60.219.235','183.60.219.236','183.60.220.231','183.60.220.232',\
> '183.60.219.247','183.60.219.248','183.60.219.243','183.60.219.244','183.60.219.251','183.60.219.252','116.31.114.202','116.31.114.204','116.31.114.206'\
> ,'116.31.114.208') then '佛山力通电信_SR' \
> when host in 
> ('183.232.169.11','183.232.169.12','183.232.169.13','183.232.169.14','183.232.169.15','183.232.169.16','183.232.169.17','183.232.169.18')
>  \
> then '佛山力通移动_SR' \
> when host in 
> ('112.93.112.11','112.93.112.12','112.93.112.13','112.93.112.14','112.93.112.15','112.93.112.16','112.93.112.17','112.93.112.18')
>  \
> then '佛山力通联通_SR' \
> when host in 
> ('114.67.56.79','114.67.56.80','114.67.56.83','114.67.56.84','114.67.56.87','114.67.56.88','114.67.56.112','114.67.56.113','114.67.56.116',\
> '114.67.56.117','114.67.60.214','114.67.60.215','114.67.54.111','114.67.54.112','114.67.56.95','114.67.56.96','114.67.54.12','114.67.54.13',\
> '114.67.56.93') \
> then '佛山力通BGP_SR' \
> when host in 
> ('114.67.56.94','114.67.56.102','114.67.56.103','114.67.56.106','114.67.56.107','183.60.220.231','183.60.220.232','183.60.219.247',\
> '183.60.219.248','114.67.60.201','114.67.60.203','114.67.60.205','114.67.60.207')
>  \
> then '佛山力通BGP_SR' \
> when host in 
> ('183.240.167.24','183.240.167.25','183.240.167.26','183.240.167.27','183.240.167.28','183.240.167.29','183.240.167.30','183.240.167.31')
>  \
> then '佛山互联移动_SR' when host in 
> ('27.45.160.24','27.45.160.25','27.45.160.26','27.45.160.27','27.45.160.28','27.45.160.29','27.45.160.30','27.45.160.31')
>  \
> then '佛山互联联通_SR' \
> when host in 
> ('43.255.228.11','43.255.228.12','43.255.228.13','43.255.228.14','43.255.228.15','43.255.228.16','43.255.228.17','43.255.228.18','43.255.228.19',\
> '43.255.228.20','43.255.228.21','43.255.228.22','43.255.228.23') \
> then '佛山互联BGP_SR' \
> when host in 
> ('43.255.228.24','43.255.228.25','43.255.228.26','43.255.228.27','43.255.228.28','43.255.228.29','43.255.228.30','43.255.228.31','43.255.228.32',\
> '43.255.228.33','43.255.228.34') \
> then '佛山互联BGP_SR' \
> when host 

Re: # of active session windows of a streaming job

2018-06-19 Thread Dongwon Kim
Hi Fabian,
Thanks a lot for your reply.

Do you need to number of active session windows as a DataStream or would
> you like to have it as a metric that you can expose.
> I possible, I would recommend to expose it as a metric because they are
> usually easier to collect.

I want to have it as a metric and it doesn't look difficult thanks to the
metric system exposed by TriggerContext.

In order to track how many session windows exist, we would need to
> increment a counter by one when a new window is created (or an element is
> assigned to a window, which is equivalent for session windows)

I agree with you that we need to increment a counter when
Trigger.onElement() is called due to the characteristic of session windows.

and decrement the counter when windows are merged by the number of merged
> windows minus one.

You decrement the counter when windows are merged, but I think we need to
decrement the counter when a window is expired as well.

However, decrementing the counter is difficult. Although the
> Trigger.onMerge() method is called, it does not know how many windows were
> merged (which is done by the WindowAssigner) and only sees the merged
> window.

We assume that timestamps of records from a user are in ascending order, so
only one window is closed at a time which simplifies the problem of how to
decrement the counter.
Nevertheless, I think I need to decrement the counter in Trigger.onClose(),
not Trigger.onMerge().
By doing that in Trigger.onClose(), we can take care of both cases: when a
window is merged and when a window is expired.
How do you think about it?

The reason I mention state is to calculate the exact number of active
sessions even after my Flink application is restarted from checkpoints or
savepoints.
If we restore from a savepoint and the counter is initialized to 0, we'll
see an incorrect value from a dashboard.
This is the biggest concern of mine at this point.

Best,

- Dongwon


On Tue, Jun 19, 2018 at 7:14 PM, Fabian Hueske  wrote:

> Hi Dongwon,
>
> Do you need to number of active session windows as a DataStream or would
> you like to have it as a metric that you can expose.
> I possible, I would recommend to expose it as a metric because they are
> usually easier to collect.
>
> SessionWindows work internally as follows:
> - every new record is added to a new window that starts at the timestamp
> of the record and ends at timestamp + gap size. When a record is added to a
> window, Trigger.onElement() is called.
> - after a window was created, the session window assigner tries to merge
> window with overlapping ranges. When windows are merged, Trigger.onMerge()
> is called.
>
> In order to track how many session windows exist, we would need to
> increment a counter by one when a new window is created (or an element is
> assigned to a window, which is equivalent for session windows) and
> decrement the counter when windows are merged by the number of merged
> windows minus one.
>
> Incrementing the counter is rather easy and can be done in
> Trigger.onElement(), either by using state or a Counter metric (Triggers
> have access to the metric system).
> However, decrementing the counter is difficult. Although the
> Trigger.onMerge() method is called, it does not know how many windows were
> merged (which is done by the WindowAssigner) and only sees the merged
> window. There might be a way to maintain state in a Trigger that allows to
> infer how many windows were merged.
>
> Best, Fabian
>
> 2018-06-16 16:39 GMT+02:00 Dongwon Kim :
>
>> Hi Fabian,
>>
>> I'm still eager to expose # of active sessions as a key metric of our
>> service but I haven’t figured it out yet.
>>
>> First of all, I want to ask you some questions regarding your suggestion.
>>
>> You could implement a Trigger that fires when a new window is created and
>> when the window is closed. A ProcessWindowFunction would emit a +1 if the
>> window was created and a -1 when the window is closes.
>> Session windows are a bit special, because you also need to handle the
>> case of merging windows, i.e., two opened windows can be merged and only
>> one (the merged) window is closed. So would need to emit a -2 if a merged
>> window was closes (assuming only two windows were merged).
>>
>> Q1)
>> How to fire when a new window is created and when the window is closed?
>> AFAIK, we can return TriggerResult only through the three functions:
>> onElement, onEventTime, and onProcessingTime.
>> Q2)
>> Firing is to emit elements in windows down to the window function, not
>> emitting values like +1, -1 and -2 which are not in windows.
>> Or do I miss something that you meant?
>>
>> In order to do that, you'd need to carry the merging information forward.
>> The Trigger.onMerge method cannot trigger the window function, but it could
>> store the merging information in state that is later accessed.
>>
>> Q3)
>> I didn't understand what you mean at all. What do you mean by carrying
>> the merging information?
>>
>> Besides your

Re: DataStreamCalcRule$1802" grows beyond 64 KB when execute long sql.

2018-06-19 Thread Fabian Hueske
Hi,

Which version are you using? We fixed a similar issue for Flink 1.5.0.
If you can't upgrade yet, you can also implement a user-defined function
that evaluates the big CASE WHEN statement.

Best, Fabian

2018-06-19 16:27 GMT+02:00 zhangminglei <18717838...@163.com>:

> Hi, friends.
>
> When I execute a long sql and get the follow error, how can I have a quick
> fix ?
>
> org.apache.flink.api.common.InvalidProgramException: Table program cannot
> be compiled. This is a bug. Please file an issue.
> at org.apache.flink.table.codegen.Compiler$class.
> compile(Compiler.scala:36)
> at org.apache.flink.table.runtime.CRowProcessRunner.
> compile(CRowProcessRunner.scala:35)
> at org.apache.flink.table.runtime.CRowProcessRunner.
> open(CRowProcessRunner.scala:49)
> at org.apache.flink.api.common.functions.util.FunctionUtils.
> openFunction(FunctionUtils.java:36)
> at org.apache.flink.streaming.api.operators.
> AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> at org.apache.flink.streaming.api.operators.ProcessOperator.
> open(ProcessOperator.java:56)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> openAllOperators(StreamTask.java:393)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:254)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.RuntimeException: Compiling
> "DataStreamCalcRule$1802": Code of method "processElement(Ljava/lang/
> Object;Lorg/apache/flink/streaming/api/functions/
> ProcessFunction$Context;Lorg/apache/flink/util/Collector;)V" of class
> "DataStreamCalcRule$1802" grows beyond 64 KB
> at org.codehaus.janino.UnitCompiler.compileUnit(
> UnitCompiler.java:361)
>
> select case when cast(status as bigint) in (200) then 10 else 1 end as pv,\
> case when cast(status as bigint) between 303 and 1000 then 1 when
> cast(status as bigint) between 100 and 199 then 1 else 0 end as
> service_fail,\
> case when cast(status as bigint) in (200) then 10 when cast(status as
> bigint) in (301,302) then 1 when cast(status as bigint) between 201 and 299
> then 1 else 0 end as service_success,\
> case when cast(status as bigint) not between 100 and 1000 then 1 else 0
> end as network_fail,\
> qqiplib(ip, 'isp') as isp,\
> case when response_time - request_time <= 6 then response_time -
> request_time else 0 end as response_t,\
> case when response_time - request_time <= 6 then 1 else 0 end as
> count_in,\
> case when host in ('116.31.114.22','116.31.114.
> 23','183.60.219.231','183.60.219.232','183.60.219.235','
> 183.60.219.236','183.60.220.231','183.60.220.232',\
> '183.60.219.247','183.60.219.248','183.60.219.243','183.60.
> 219.244','183.60.219.251','183.60.219.252','116.31.114.
> 202','116.31.114.204','116.31.114.206'\
> ,'116.31.114.208') then '佛山力通电信_SR' \
> when host in ('183.232.169.11','183.232.169.12','183.232.169.13','183.
> 232.169.14','183.232.169.15','183.232.169.16','183.232.169.17','183.232.169.18')
> \
> then '佛山力通移动_SR' \
> when host in ('112.93.112.11','112.93.112.12','112.93.112.13','112.93.
> 112.14','112.93.112.15','112.93.112.16','112.93.112.17','112.93.112.18') \
> then '佛山力通联通_SR' \
> when host in ('114.67.56.79','114.67.56.80','114.67.56.83','114.67.56.84'
> ,'114.67.56.87','114.67.56.88','114.67.56.112','114.67.56.
> 113','114.67.56.116',\
> '114.67.56.117','114.67.60.214','114.67.60.215','114.67.
> 54.111','114.67.54.112','114.67.56.95','114.67.56.96','114.
> 67.54.12','114.67.54.13',\
> '114.67.56.93') \
> then '佛山力通BGP_SR' \
> when host in ('114.67.56.94','114.67.56.102','114.67.56.103','114.67.
> 56.106','114.67.56.107','183.60.220.231','183.60.220.232','
> 183.60.219.247',\
> '183.60.219.248','114.67.60.201','114.67.60.203','114.67.60.205','114.67.60.207')
> \
> then '佛山力通BGP_SR' \
> when host in ('183.240.167.24','183.240.167.25','183.240.167.26','183.
> 240.167.27','183.240.167.28','183.240.167.29','183.240.167.30','183.240.167.31')
> \
> then '佛山互联移动_SR' when host in ('27.45.160.24','27.45.160.25'
> ,'27.45.160.26','27.45.160.27','27.45.160.28','27.45.160.29'
> ,'27.45.160.30','27.45.160.31') \
> then '佛山互联联通_SR' \
> when host in ('43.255.228.11','43.255.228.12','43.255.228.13','43.255.
> 228.14','43.255.228.15','43.255.228.16','43.255.228.17','
> 43.255.228.18','43.255.228.19',\
> '43.255.228.20','43.255.228.21','43.255.228.22','43.255.228.23') \
> then '佛山互联BGP_SR' \
> when host in ('43.255.228.24','43.255.228.25','43.255.228.26','43.255.
> 228.27','43.255.228.28','43.255.228.29','43.255.228.30','
> 43.255.228.31','43.255.228.32',\
> '43.255.228.33','43.255.228.34') \
> then '佛山互联BGP_SR' \
> when host in ('14.17.91.113','14.17.91.114','14.17.72.11','14.17.72.12','
> 14.17.72.15','14.17.72.16','14.17.72.19','14.17.72.20','
> 183.61.170.121','183.61.170.122',\
> '14.17.91.120','14.17.91.121','14.17.72.79','14.17.72.80','14.17.72.81','14.17.72

DataStreamCalcRule$1802" grows beyond 64 KB when execute long sql.

2018-06-19 Thread zhangminglei
Hi, friends.

When I execute a long sql and get the follow error, how can I have a quick fix ?

org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
compiled. This is a bug. Please file an issue.
at 
org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36)
at 
org.apache.flink.table.runtime.CRowProcessRunner.compile(CRowProcessRunner.scala:35)
at 
org.apache.flink.table.runtime.CRowProcessRunner.open(CRowProcessRunner.scala:49)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at 
org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Compiling "DataStreamCalcRule$1802": 
Code of method 
"processElement(Ljava/lang/Object;Lorg/apache/flink/streaming/api/functions/ProcessFunction$Context;Lorg/apache/flink/util/Collector;)V"
 of class "DataStreamCalcRule$1802" grows beyond 64 KB
at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:361)

select case when cast(status as bigint) in (200) then 10 else 1 end as pv,\
case when cast(status as bigint) between 303 and 1000 then 1 when cast(status 
as bigint) between 100 and 199 then 1 else 0 end as service_fail,\
case when cast(status as bigint) in (200) then 10 when cast(status as bigint) 
in (301,302) then 1 when cast(status as bigint) between 201 and 299 then 1 else 
0 end as service_success,\
case when cast(status as bigint) not between 100 and 1000 then 1 else 0 end as 
network_fail,\
qqiplib(ip, 'isp') as isp,\
case when response_time - request_time <= 6 then response_time - 
request_time else 0 end as response_t,\
case when response_time - request_time <= 6 then 1 else 0 end as count_in,\
case when host in 
('116.31.114.22','116.31.114.23','183.60.219.231','183.60.219.232','183.60.219.235','183.60.219.236','183.60.220.231','183.60.220.232',\
'183.60.219.247','183.60.219.248','183.60.219.243','183.60.219.244','183.60.219.251','183.60.219.252','116.31.114.202','116.31.114.204','116.31.114.206'\
,'116.31.114.208') then '佛山力通电信_SR' \
when host in 
('183.232.169.11','183.232.169.12','183.232.169.13','183.232.169.14','183.232.169.15','183.232.169.16','183.232.169.17','183.232.169.18')
 \
then '佛山力通移动_SR' \
when host in 
('112.93.112.11','112.93.112.12','112.93.112.13','112.93.112.14','112.93.112.15','112.93.112.16','112.93.112.17','112.93.112.18')
 \
then '佛山力通联通_SR' \
when host in 
('114.67.56.79','114.67.56.80','114.67.56.83','114.67.56.84','114.67.56.87','114.67.56.88','114.67.56.112','114.67.56.113','114.67.56.116',\
'114.67.56.117','114.67.60.214','114.67.60.215','114.67.54.111','114.67.54.112','114.67.56.95','114.67.56.96','114.67.54.12','114.67.54.13',\
'114.67.56.93') \
then '佛山力通BGP_SR' \
when host in 
('114.67.56.94','114.67.56.102','114.67.56.103','114.67.56.106','114.67.56.107','183.60.220.231','183.60.220.232','183.60.219.247',\
'183.60.219.248','114.67.60.201','114.67.60.203','114.67.60.205','114.67.60.207')
 \
then '佛山力通BGP_SR' \
when host in 
('183.240.167.24','183.240.167.25','183.240.167.26','183.240.167.27','183.240.167.28','183.240.167.29','183.240.167.30','183.240.167.31')
 \
then '佛山互联移动_SR' when host in 
('27.45.160.24','27.45.160.25','27.45.160.26','27.45.160.27','27.45.160.28','27.45.160.29','27.45.160.30','27.45.160.31')
 \
then '佛山互联联通_SR' \
when host in 
('43.255.228.11','43.255.228.12','43.255.228.13','43.255.228.14','43.255.228.15','43.255.228.16','43.255.228.17','43.255.228.18','43.255.228.19',\
'43.255.228.20','43.255.228.21','43.255.228.22','43.255.228.23') \
then '佛山互联BGP_SR' \
when host in 
('43.255.228.24','43.255.228.25','43.255.228.26','43.255.228.27','43.255.228.28','43.255.228.29','43.255.228.30','43.255.228.31','43.255.228.32',\
'43.255.228.33','43.255.228.34') \
then '佛山互联BGP_SR' \
when host in 
('14.17.91.113','14.17.91.114','14.17.72.11','14.17.72.12','14.17.72.15','14.17.72.16','14.17.72.19','14.17.72.20','183.61.170.121','183.61.170.122',\
'14.17.91.120','14.17.91.121','14.17.72.79','14.17.72.80','14.17.72.81','14.17.72.82')
 \
then '东莞电信_SR' \
when host in 
('221.228.213.94','221.228.213.95','221.228.213.68','221.228.213.69','221.228.213.76','221.228.213.77','221.228.213.98','221.228.213.99',\
'221.228.213.122','221.228.213.124') \
then '无锡电信_SR' \
when host in 
('43.247.88.49','43.247.88.50','43.247.88.135','43.247.88.136','43.247.88.16','43.247.88.17','43.247.88.143','43.247.88.144','43.247.88.183','43.247.88.185',\
'43.247.88.187','43.247.88.189') \
then '无锡BGP_SR' \

Re: Flink application does not scale as expected, please help!

2018-06-19 Thread Siew Wai Yow
Thanks @Fabian and @Till for the good explanation. The picture is pretty clear 
right now. As for the single slot TM test, seems it trying to allocate the same 
machine's slot first as well. But with that the result is less spiky. i guess 
@Fabian is right that network is our bottleneck in case because we have only 
1Gbps. Will fix it and test again.


Thanks guys, including @Sihua zhou who help to test the program in his lab :)



From: Till Rohrmann 
Sent: Tuesday, June 19, 2018 3:48 PM
To: wai_...@hotmail.com
Cc: ovidiu-cristian.ma...@inria.fr; Fabian Hueske; Jörn Franke; user
Subject: Re: Flink application does not scale as expected, please help!

Hi,

as Fabian explained, if you exceed the number of slots on a single TM, then 
Flink needs to deploy tasks on other TMs which causes a network transfer 
between the sources and the mappers. This will have a negative impact if you 
compare it to a setup where everything runs locally.

The scheduling in Flink works as follows: Flink will try to co-locate consumer 
tasks with their producers in order to avoid costly network communication. If 
there are more than 8 producer tasks, then it is assumed that the consumer can 
be scheduled everywhere because you might only achieve to read 1/8 of your 
input locally.

The question now is how the sources are scheduled, because they determine how 
the mappers are scheduled. For tasks with no inputs, Flink will pick a random 
slot without preferences for spreading them across as many TMs as possible or 
packing them as closely together as possible. The reason is that both 
strategies might perform better depending on the actual use case. Moreover, the 
way the scheduling works right now, the scheduler does not know how many 
sources will be scheduled. This means that you don't necessarily know how many 
TMs you have to start in order to execute all source tasks and, thus, you don't 
know across how many TMs you have to spread your sources. Due to the 
implementation of the SlotManager, which is responsible for the slot management 
on the RM, we will first allocate all slots from a single TM before allocating 
slots from another TM.

Due to the setup of your test (different parallelism of sources and mappers), 
you introduce a shuffle between the sources and the Json mappers. This is not 
optimal and I would suggest to instantiate as many sources as you have mappers. 
That way these two operators can be chained and run in the same thread. This 
would also spread the load of generating events across the whole cluster 
instead of letting a single machine be responsible for producing all events. 
Moreover, in order to get rid of the measurement artifacts introduced by 
switching from local to remote communication you could start your TMs with a 
single slot as Fabian suggested. Otherwise you might see a slight drop whenever 
you exceed the multiple of 16 by one.

As a side node, if you don't need a shuffle when changing the parallelism 
between two operators, then you could also use the rescale command which tries 
to minimize the communication between two operators. For example, if your 
source has a parallelism of 2 and your mapper of 4, then rescale would cause 
the first source to talk to mapper 1 and 2 and the second source to only talk 
to mapper 3 and 4.

Cheers,
Till



From: Fabian Hueske 
Sent: Tuesday, June 19, 2018 3:55 PM
To: Siew Wai Yow
Cc: Ovidiu-Cristian MARCU; Jörn Franke; user@flink.apache.org; 
trohrm...@apache.org
Subject: Re: Flink application does not scale as expected, please help!

Hi Siew,

The hint about the lower source parallelism compared to the operator 
parallelism might be the right one.

Can you check if all source tasks are scheduled to the same machine?
In that case your application might be bottlenecked by the out-going network 
connection of that single machine.

Best, Fabian


On Mon, Jun 18, 2018 at 5:30 PM Siew Wai Yow 
mailto:wai_...@hotmail.com>> wrote:

Thanks @Fabian for your confirmation about the better performance when scaling 
happened at same TM machine. But it is so funny that it give impression "the 
more I scale the less I get" when the performance drop with more TM in play.

@Ovidiu question is interesting to know too. @Till do you mind to share your 
thoughts?

Thank you guys!


From: Ovidiu-Cristian MARCU 
mailto:ovidiu-cristian.ma...@inria.fr>>
Sent: Monday, June 18, 2018 6:28 PM
To: Fabian Hueske
Cc: Siew Wai Yow; Jörn Franke; 
user@flink.apache.org
Subject: Re: Flink application does not scale as expected, please help!

Hi all,

Allow me to add some comments/questions on this issue that is very interesting.
According to documentation [1] the pipeline example assumes the source is 
running with the same parallelism as successive map operator and the workflow 
optimizes to collocate source and map tasks if possible.

For an application configu

Re: Flink kafka consumer stopped committing offsets

2018-06-19 Thread Juho Autio
Hi,

Thanks for your analysis.

We found LeaderElectionRateAndTimeMs go to non-zero value on Kafka around
the same time when this error was seen in the Flink job.

Kafka itself recovers from this and so do any other consumers that we have.
It seems like a bug in kafka consumer library if this error causes it to
stop committing offsets. If you have any further insight to this, please
let me know.

Apart from that, leader election doesn't happen in normal situation. But it
can happen for example if there are connectivity problems between the Kafka
nodes.

On Mon, Jun 11, 2018 at 6:41 PM amit pal  wrote:

> Probably your kafka consumer is rebalancing.  This can be due to a bigger
> message processing time due to which kafka broker is marking your consumer
> dead and rebalancing. This all happens before the consumer can commit the
> offsets.
>
> On Mon, Jun 11, 2018 at 7:37 PM Piotr Nowojski 
> wrote:
>
>> The more I look into it, the more it seems like a Kafka bug or some
>> cluster failure from which your Kafka cluster did not recover.
>>
>> In your cases auto committing should be set to true and in that case
>> KafkaConsumer should commit offsets once every so often when it’s polling
>> messages. Unless for example `cordinatorUnknown()` returns false in
>> `org.apache.kafka.clients.consumer.internals.ConsumerCoordinator#maybeAutoCommitOffsetsAsync`
>> (Kafka 0.10.2.1 code base):
>>
>> private void maybeAutoCommitOffsetsAsync(long now) {
>> if (autoCommitEnabled) {
>> if (coordinatorUnknown()) {
>> this.nextAutoCommitDeadline = now + retryBackoffMs;
>> } else if (now >= nextAutoCommitDeadline) {
>> this.nextAutoCommitDeadline = now + autoCommitIntervalMs;
>> doAutoCommitOffsetsAsync();
>> }
>> }
>> }
>>
>> Have you checked Kafka logs? This suggests that the real problem is
>> hidden behind:
>>
>> >  INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator
>>  - Marking the coordinator my-kafka-host-10-1-16-
>> 97.cloud-internal.mycompany.com:9092 (id: 2147483550 <(214)%20748-3550>
>> rack: null) dead for group
>> aggregate-all_server_measurements_combined-20180606-1000
>>
>> And maybe your Kafka cluster/consumer can not recover from this situation.
>>
>> Another thing to try (simpler) is to just trying upgrading Kafka cluster.
>>
>> Piotrek
>>
>> On 11 Jun 2018, at 11:44, Juho Autio  wrote:
>>
>> Hi Piotr, thanks for your insights.
>>
>> > What’s your KafkaConsumer configuration?
>>
>> We only set these in the properties that are passed to
>> FlinkKafkaConsumer010 constructor:
>>
>> auto.offset.reset=latest
>> bootstrap.servers=my-kafka-host:9092
>> group.id=my_group
>> flink.partition-discovery.interval-millis=3
>>
>> > is checkpointing enabled?
>>
>> No.
>>
>> > enable.auto.commit (or auto.commit.enable for Kafka 0.8) /
>> auto.commit.interval.ms
>>
>> We have whatever is the default behaviour of Flink kafka consumer. It
>> seems to commit quite often, something like every 5 seconds.
>>
>> > did you set setCommitOffsetsOnCheckpoints() ?
>>
>> No. But I checked with debugger that
>> apparently enableCommitOnCheckpoints=true is the default.
>>
>> I also checked with debugger that offsetCommitMode=KAFKA_PERIODIC.
>>
>> So I guess you're right that this bug doesn't seem to be in Flink itself?
>> I wonder if it's a known issue in Kafka client lib..
>>
>> I also took thread dump on one of the task managers in this broken state.
>> But I couldn't spot anything obvious when comparing the threads to a dump
>> from a job where offsets are being committed. Any way I've saved the thread
>> dump in case there's something to look for specifically.
>>
>> Sharing the full logs of job & task managers would be a bit of a hassle,
>> because I don't have an automatic way to obfuscate the logs so that I'm
>> sure that there isn't anything sensitive left. Any way, there isn't
>> anything else to share really. I wrote: "As you can see, it didn't log
>> anything until ~2018-06-07 22:08. Also that's where the log ends".
>>
>> Thanks once more.
>>
>> On Mon, Jun 11, 2018 at 11:18 AM, Piotr Nowojski > > wrote:
>>
>>> Hi,
>>>
>>> What’s your KafkaConsumer configuration? Especially values for:
>>> - is checkpointing enabled?
>>> - enable.auto.commit (or auto.commit.enable for Kafka 0.8) /
>>> auto.commit.interval.ms
>>> - did you set setCommitOffsetsOnCheckpoints() ?
>>>
>>> Please also refer to
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration
>>>  ,
>>> especially this part:
>>>
>>> > Note that the Flink Kafka Consumer does not rely on the committed
>>> offsets for fault tolerance guarantees. The committed offsets are only a
>>> means to expose the consumer’s progress for monitoring purposes.
>>>
>>> Can you post full logs from all TaskManagers/JobManager and can you
>>> say/estimate when did the committing brake/stop? Did you check Kafka logs
>>> for a

Re: Breakage in Flink CLI in 1.5.0

2018-06-19 Thread Chesnay Schepler
In 1.5 we reworked the job-submission to go through the REST API instead 
of akka.


I believe the jobmanager rpc port shouldn't be necessary anymore, the 
rpc address is still /required /due to some technical implementations; 
it may be that you can set this to some arbitrary value however.


As a result the REST API (i.e. the web server) must be running in order 
to submit jobs.


On 19.06.2018 14:12, Sampath Bhat wrote:

Hello

I'm using Flink 1.5.0 version and Flink CLI to submit jar to flink 
cluster.


In flink 1.4.2 only job manager rpc address and job manager rpc port 
were sufficient for flink client to connect to job manager and submit 
the job.


But in flink 1.5.0 the flink client additionally requires the 
rest.address and rest.port for submitting the job to job manager. What 
is the advantage of this new method over the 1.4.2 method of 
submitting job?


Moreover if we make rest.port = -1 the web server will not be 
instantiated then how should we submit the job?


Regards
Sampath





Re:Ordering of stream from different kafka partitions

2018-06-19 Thread sihua zhou


Hi Amol,


I'm not sure whether this is impossible, especially when you need to operate 
the record in multi parallelism. 


IMO, in theroy, we can only get a ordered stream when there is a single 
partition of kafka and operate it with a single parallelism in flink. Even in 
this case, if you only want to order the records in a window, than you need to 
store the records in the state, and order them when the window is triggered. 
But if you want to order the records with a single `keyBy()`(non-window), I 
think that's maybe impossible in practice, because you need to store the all 
the incoming records and order the all data for every incoming records, also 
you need to send retracted message for the previous result(because every 
incoming record might change the global order of the records).


Best, Sihua
On 06/19/2018 19:19,Amol S - iProgrammer wrote:
Hi,

I have used flink streaming API in my application where the source of
streaming is kafka. My kafka producer will publish data in ascending order
of time in different partitions of kafka and consumer will read data from
these partitions. However some kafka partitions may be slow due to some
operation and produce late results. Is there any way to maintain order in
this stream though the data arrive out of order. I have tried
BoundedOutOfOrdernessTimestampExtractor but it didn't served the purpose.
While digging this problem I came across your documentation (URL:
https://cwiki.apache.org/confluence/display/FLINK/Time+and+Order+in+Streams)
and tried to implement this but it didnt worked. I also tried with Table
API order by but it seems you not support orderBy in flink 1.5 version.
Please suggest me any workaround for this.

I have raised same concern on stack overflow

https://stackoverflow.com/questions/50904615/ordering-of-streams-while-reading-data-from-multiple-kafka-partitions

Thanks,

---
*Amol Suryawanshi*
Java Developer
am...@iprogrammer.com


*iProgrammer Solutions Pvt. Ltd.*



*Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society,
Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - 411016,
MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer*
www.iprogrammer.com 



Breakage in Flink CLI in 1.5.0

2018-06-19 Thread Sampath Bhat
Hello

I'm using Flink 1.5.0 version and Flink CLI to submit jar to flink cluster.

In flink 1.4.2 only job manager rpc address and job manager rpc port were
sufficient for flink client to connect to job manager and submit the job.

But in flink 1.5.0 the flink client additionally requires the rest.address
and rest.port for submitting the job to job manager. What is the advantage
of this new method over the 1.4.2 method of submitting job?

Moreover if we make rest.port = -1 the web server will not be instantiated
then how should we submit the job?

Regards
Sampath


Re: Exception while submitting jobs through Yarn

2018-06-19 Thread Ted Yu
Since you're using a vendor's distro, I would suggest asking on their user 
forum.
Cheers
 Original message From: Garvit Sharma  
Date: 6/19/18  3:34 AM  (GMT-08:00) To: trohrm...@apache.org Cc: Amit Jain 
, Chesnay Schepler , Ted Yu 
, user@flink.apache.org Subject: Re: Exception while 
submitting jobs through Yarn 
Any help on this?
On Mon, Jun 18, 2018 at 11:31 PM Garvit Sharma  wrote:
Yes, it is. 

On Mon, Jun 18, 2018 at 7:54 PM Till Rohrmann  wrote:
Is `/usr/hdp/2.6.3.0-235/hadoop/client/xercesImpl.jar` a link to 
`/usr/hdp/2.6.3.0-235/hadoop/client/xercesImpl-2.9.1.jar`?
On Mon, Jun 18, 2018 at 4:02 PM Garvit Sharma  wrote:
I don't think I can access core-default as it comes with Hadoop jar
On Mon, 18 Jun 2018 at 7:30 PM, Till Rohrmann  wrote:
Hmm, could you check whether core-default.xml contains any suspicious entries? 
Apparently xerces:2.9.1 cannot read it.
On Mon, Jun 18, 2018 at 3:40 PM Garvit Sharma  wrote:
Hi,
After putting the following log in my code, I can see that the Xerces version 
is - Xerces version : Xerces-J 2.9.1log.info("Xerces version : {}", 
org.apache.xerces.impl.Version.getVersion());Also, following is the response of 
$ locate xerces command on the server -

 





/usr/hdp/2.6.1.0-129/falcon/client/lib/xercesImpl-2.10.0.jar
/usr/hdp/2.6.1.0-129/hadoop/client/xercesImpl-2.9.1.jar
/usr/hdp/2.6.1.0-129/hadoop/client/xercesImpl.jar
/usr/hdp/2.6.1.0-129/hadoop-hdfs/lib/xercesImpl-2.9.1.jar
/usr/hdp/2.6.1.0-129/hbase/lib/xercesImpl-2.9.1.jar
/usr/hdp/2.6.1.0-129/hive-hcatalog/share/webhcat/svr/lib/xercesImpl-2.9.1.jar
/usr/hdp/2.6.1.0-129/livy/jars/xercesImpl-2.9.1.jar
/usr/hdp/2.6.1.0-129/livy2/jars/xercesImpl-2.9.1.jar
/usr/hdp/2.6.1.0-129/oozie/lib/xercesImpl-2.10.0.jar
/usr/hdp/2.6.1.0-129/oozie/libserver/xercesImpl-2.10.0.jar
/usr/hdp/2.6.1.0-129/oozie/libtools/xercesImpl-2.10.0.jar
/usr/hdp/2.6.1.0-129/slider/lib/xercesImpl-2.9.1.jar
/usr/hdp/2.6.1.0-129/spark2/jars/xercesImpl-2.9.1.jar
/usr/hdp/2.6.1.0-129/storm/contrib/storm-autocreds/xercesImpl-2.9.1.jar
/usr/hdp/2.6.1.0-129/zookeeper/lib/xercesMinimal-1.9.6.2.jar
/usr/hdp/2.6.3.0-235/falcon/client/lib/xercesImpl-2.10.0.jar
/usr/hdp/2.6.3.0-235/hadoop/client/xercesImpl-2.9.1.jar
/usr/hdp/2.6.3.0-235/hadoop/client/xercesImpl.jar
/usr/hdp/2.6.3.0-235/hadoop-hdfs/lib/xercesImpl-2.9.1.jar
/usr/hdp/2.6.3.0-235/hbase/lib/xercesImpl-2.9.1.jar
/usr/hdp/2.6.3.0-235/hive-hcatalog/share/webhcat/svr/lib/xercesImpl-2.9.1.jar
/usr/hdp/2.6.3.0-235/livy/jars/xercesImpl-2.9.1.jar
/usr/hdp/2.6.3.0-235/livy2/jars/xercesImpl-2.9.1.jar
/usr/hdp/2.6.3.0-235/oozie/lib/xercesImpl-2.10.0.jar


Re: Exception while submitting jobs through Yarn

2018-06-19 Thread Garvit Sharma
Any help on this?

On Mon, Jun 18, 2018 at 11:31 PM Garvit Sharma  wrote:

> Yes, it is.
>
> On Mon, Jun 18, 2018 at 7:54 PM Till Rohrmann 
> wrote:
>
>> Is `/usr/hdp/2.6.3.0-235/hadoop/client/xercesImpl.jar` a link to `
>> /usr/hdp/2.6.3.0-235/hadoop/client/xercesImpl-2.9.1.jar`?
>>
>> On Mon, Jun 18, 2018 at 4:02 PM Garvit Sharma 
>> wrote:
>>
>>> I don't think I can access core-default as it comes with Hadoop jar
>>>
>>> On Mon, 18 Jun 2018 at 7:30 PM, Till Rohrmann 
>>> wrote:
>>>
 Hmm, could you check whether core-default.xml contains any suspicious
 entries? Apparently xerces:2.9.1 cannot read it.

 On Mon, Jun 18, 2018 at 3:40 PM Garvit Sharma 
 wrote:

> Hi,
>
> After putting the following log in my code, I can see that the Xerces
> version is - Xerces version : Xerces-J 2.9.1
>
> log.info("Xerces version : {}", 
> org.apache.xerces.impl.Version.getVersion());
>
> Also, following is the response of *$* *locate xerces* command on the
> server -
>
>
> /usr/hdp/2.6.1.0-129/falcon/client/lib/xercesImpl-2.10.0.jar
>
> /usr/hdp/2.6.1.0-129/hadoop/client/xercesImpl-2.9.1.jar
>
> /usr/hdp/2.6.1.0-129/hadoop/client/xercesImpl.jar
>
> /usr/hdp/2.6.1.0-129/hadoop-hdfs/lib/xercesImpl-2.9.1.jar
>
> /usr/hdp/2.6.1.0-129/hbase/lib/xercesImpl-2.9.1.jar
>
>
> /usr/hdp/2.6.1.0-129/hive-hcatalog/share/webhcat/svr/lib/xercesImpl-2.9.1.jar
>
> /usr/hdp/2.6.1.0-129/livy/jars/xercesImpl-2.9.1.jar
>
> /usr/hdp/2.6.1.0-129/livy2/jars/xercesImpl-2.9.1.jar
>
> /usr/hdp/2.6.1.0-129/oozie/lib/xercesImpl-2.10.0.jar
>
> /usr/hdp/2.6.1.0-129/oozie/libserver/xercesImpl-2.10.0.jar
>
> /usr/hdp/2.6.1.0-129/oozie/libtools/xercesImpl-2.10.0.jar
>
> /usr/hdp/2.6.1.0-129/slider/lib/xercesImpl-2.9.1.jar
>
> /usr/hdp/2.6.1.0-129/spark2/jars/xercesImpl-2.9.1.jar
>
> /usr/hdp/2.6.1.0-129/storm/contrib/storm-autocreds/xercesImpl-2.9.1.jar
>
> /usr/hdp/2.6.1.0-129/zookeeper/lib/xercesMinimal-1.9.6.2.jar
>
> /usr/hdp/2.6.3.0-235/falcon/client/lib/xercesImpl-2.10.0.jar
>
> /usr/hdp/2.6.3.0-235/hadoop/client/xercesImpl-2.9.1.jar
>
> /usr/hdp/2.6.3.0-235/hadoop/client/xercesImpl.jar
>
> /usr/hdp/2.6.3.0-235/hadoop-hdfs/lib/xercesImpl-2.9.1.jar
>
> /usr/hdp/2.6.3.0-235/hbase/lib/xercesImpl-2.9.1.jar
>
>
> /usr/hdp/2.6.3.0-235/hive-hcatalog/share/webhcat/svr/lib/xercesImpl-2.9.1.jar
>
> /usr/hdp/2.6.3.0-235/livy/jars/xercesImpl-2.9.1.jar
>
> /usr/hdp/2.6.3.0-235/livy2/jars/xercesImpl-2.9.1.jar
>
> /usr/hdp/2.6.3.0-235/oozie/lib/xercesImpl-2.10.0.jar
>
> /usr/hdp/2.6.3.0-235/oozie/libserver/xercesImpl-2.10.0.jar
>
> /usr/hdp/2.6.3.0-235/oozie/libtools/xercesImpl-2.10.0.jar
>
>
> /usr/hdp/2.6.3.0-235/ranger-admin/ews/webapp/WEB-INF/lib/xercesImpl-2.9.1.jar
>
> /usr/hdp/2.6.3.0-235/slider/lib/xercesImpl-2.9.1.jar
>
> /usr/hdp/2.6.3.0-235/spark2/jars/xercesImpl-2.9.1.jar
>
> /usr/hdp/2.6.3.0-235/storm/contrib/storm-autocreds/xercesImpl-2.9.1.jar
>
> /usr/hdp/2.6.3.0-235/zookeeper/lib/xercesMinimal-1.9.6.2.jar
>
> /usr/hdp/share/hst/hst-common/lib/xercesImpl-2.9.1.jar
>
> Now, I can say that the version of xerces are same.
>
>
> So, what is causing this issue if Xerces version is in sync?
>
>
> I am very excited to discover the issue :)
>
>
> Thanks,
>
> On Mon, Jun 18, 2018 at 6:27 PM Till Rohrmann 
> wrote:
>
>> Could you check which xerces version you have on your classpath?
>> Apparently, it cannot read core-default.xml as Ted pointed out. This 
>> might
>> be the root cause for the failure.
>>
>> Cheers,
>> Till
>>
>> On Mon, Jun 18, 2018 at 1:31 PM Garvit Sharma 
>> wrote:
>>
>>> Hi,
>>>
>>> Sorry for the confusion, but the yarn is running on Hadoop version
>>> 2.7 only and hence I am using Flink 1.5 Hadoop 2.7 binary.
>>>
>>> Below are the details provided by Yarn version command :
>>>
>>> Hadoop 2.7.3.2.6.3.0-235
>>> Subversion g...@github.com:hortonworks/hadoop.git -r
>>> 45bfd33bba8acadfa0e6024c80981c023b28d454
>>> Compiled by jenkins on 2017-10-30T02:31Z
>>> Compiled with protoc 2.5.0
>>> From source with checksum cd1a4a466ef450f547c279989f3aa3
>>> This command was run using
>>> /usr/hdp/2.6.3.0-235/hadoop/hadoop-common-2.7.3.2.6.3.0-235.jar
>>>
>>> Please let me know if you have found the resolution to my issue :)
>>>
>>> Thanks,
>>>
>>>
>>> On Mon, Jun 18, 2018 at 4:50 PM Till Rohrmann 
>>> wrote:
>>>
 Which Hadoop version have you installed? It looks as if Flink has
 been build with Hadoop 2.7 but I see /usr/hdp/2.6.3.0-235 in the class
 path. If you want to run Flink on Hadoop 2.6, then tr

Re: Stream Join With Early firings

2018-06-19 Thread Fabian Hueske
Hi Johannes,

You are right. You should approach the problem with the semantics that you
need before thinking about optimizations such as state size.

The Table API / SQL offers (in v1.5.0) two types of joins:
1) Windowed joins where each record joins with records in a time-range of
the other stream "(A.ts BETWEEN B.ts - 1 hour AND B.ts + 1 hour)"
2) Non-windowed joins, which support arbitrary join predicates but which
fully materialize both inputs. As you mentioned, you can use idle state
retention to remove records from state that have not been accessed for a
certain time.

Best, Fabian

2018-06-18 11:09 GMT+02:00 Johannes Schulte :

> Hi Fabian,
>
> thanks for the hints, though I somehow got the feeling that I am on the
> wrong track given how much code I would need to write for implementing a
> "blueprint" usecase.
>
> Would a join be more simple using the Table API? In the end it's the
> classical Order & OrderPosition example, where the output is an
> upsert-stream. Would I get the expected behaviour (output elements on every
> update on either side of the input stream). I realize that my session
> window approach wasn't driven by the requirements but by operational
> aspects (state size), so using a concept like idle state retention time
> would be a more natural fit.
>
> Thanks,
>
> Johannes
>
> On Mon, Jun 18, 2018 at 9:57 AM Fabian Hueske  wrote:
>
>> Hi Johannes,
>>
>> EventTimeSessionWindows [1] use the EventTimeTrigger [2] as default
>> trigger (see EventTimeSessionWindows.getDefaultTrigger()).
>>
>> I would take the EventTimeTrigger and extend it with early firing
>> functionality.
>> However, there are a few things to consider
>> * you need to be aware that session window can be merged, i.e., two
>> session windows A, B with gap 10: A [20,25), B [37, 45), will be merged
>> when a record at 32 is received.
>> * windows store all records in a list. For every firing, you need to
>> iterate the full list and also track which records you joined already to
>> avoid duplicates. Maybe you can migrate records from the window state into
>> a custom state defined in a ProcessWindowFunction.
>>
>> Best, Fabian
>>
>>
>>
>>
>>
>> 2018-06-13 13:43 GMT+02:00 Johannes Schulte :
>>
>>> Hi,
>>>
>>> I am joining two streams with a session window and want to emit a joined
>>> (early) result for every element arriving on one of the streams.
>>>
>>> Currently the code looks like this:
>>>
>>> s1.join(s2)
>>> .where(s1.id).equalTo(s2.id)
>>> .window(EventTimeSessionWindows.withGap(Time.minutes(15)))
>>> // trigger(?)
>>> .apply(...custom code..)
>>>
>>> What I am missing is the right trigger ala "withEarlyFiring" - do I need
>>> to implement my on trigger for this and if yes, what kind of functionality
>>> must be present to not break the session window semantics?
>>>
>>> Thanks in advance,
>>>
>>> Johannes
>>>
>>>
>>


Re: # of active session windows of a streaming job

2018-06-19 Thread Fabian Hueske
Hi Dongwon,

Do you need to number of active session windows as a DataStream or would
you like to have it as a metric that you can expose.
I possible, I would recommend to expose it as a metric because they are
usually easier to collect.

SessionWindows work internally as follows:
- every new record is added to a new window that starts at the timestamp of
the record and ends at timestamp + gap size. When a record is added to a
window, Trigger.onElement() is called.
- after a window was created, the session window assigner tries to merge
window with overlapping ranges. When windows are merged, Trigger.onMerge()
is called.

In order to track how many session windows exist, we would need to
increment a counter by one when a new window is created (or an element is
assigned to a window, which is equivalent for session windows) and
decrement the counter when windows are merged by the number of merged
windows minus one.

Incrementing the counter is rather easy and can be done in
Trigger.onElement(), either by using state or a Counter metric (Triggers
have access to the metric system).
However, decrementing the counter is difficult. Although the
Trigger.onMerge() method is called, it does not know how many windows were
merged (which is done by the WindowAssigner) and only sees the merged
window. There might be a way to maintain state in a Trigger that allows to
infer how many windows were merged.

Best, Fabian

2018-06-16 16:39 GMT+02:00 Dongwon Kim :

> Hi Fabian,
>
> I'm still eager to expose # of active sessions as a key metric of our
> service but I haven’t figured it out yet.
>
> First of all, I want to ask you some questions regarding your suggestion.
>
> You could implement a Trigger that fires when a new window is created and
> when the window is closed. A ProcessWindowFunction would emit a +1 if the
> window was created and a -1 when the window is closes.
> Session windows are a bit special, because you also need to handle the
> case of merging windows, i.e., two opened windows can be merged and only
> one (the merged) window is closed. So would need to emit a -2 if a merged
> window was closes (assuming only two windows were merged).
>
> Q1)
> How to fire when a new window is created and when the window is closed?
> AFAIK, we can return TriggerResult only through the three functions:
> onElement, onEventTime, and onProcessingTime.
> Q2)
> Firing is to emit elements in windows down to the window function, not
> emitting values like +1, -1 and -2 which are not in windows.
> Or do I miss something that you meant?
>
> In order to do that, you'd need to carry the merging information forward.
> The Trigger.onMerge method cannot trigger the window function, but it could
> store the merging information in state that is later accessed.
>
> Q3)
> I didn't understand what you mean at all. What do you mean by carrying the
> merging information?
>
> Besides your suggestion, I implemented a custom trigger which is almost
> the same as EventTimeTrigger except the followings:
> - it maintains a variable to count sessions in an instance of a window
> operator
> - it increases the variable by 1 when onElement is invoked
> - it decreases the variable by 1 when onClose is invoked
> Considering the logic of Flink’s session window, it correctly counts
> sessions in an instance of a window operator.
>
> As you might have already noticed, this approach has a critical problem: 
> there's
> no way to maintain an operator state inside a trigger.
> TriggerContext only allows to interact with state that is scoped to the
> window and the key of the current trigger invocation (as shown in
> Trigger#TriggerContext)
>
> Now I've come to a conclusion that it might not be possible using
> DataStream API.
> Otherwise, do I need to think in a totally different way to achieve the
> goal?
>
> Best,
>
> - Dongwon
>
>
>
> 2018. 2. 20. 오후 6:53, Fabian Hueske  작성:
>
> Hi Dongwon Kim,
>
> That's an interesting question.
>
> I don't have a solution blueprint for you, but a few ideas that should
> help to solve the problem.
>
> I would start with a separate job first and later try to integrate it with
> the other job.
> You could implement a Trigger that fires when a new window is created and
> when the window is closed. A ProcessWindowFunction would emit a +1 if the
> window was created and a -1 when the window is closes.
> Session windows are a bit special, because you also need to handle the
> case of merging windows, i.e., two opened windows can be merged and only
> one (the merged) window is closed. So would need to emit a -2 if a merged
> window was closes (assuming only two windows were merged).
> In order to do that, you'd need to carry the merging information forward.
> The Trigger.onMerge method cannot trigger the window function, but it could
> store the merging information in state that is later accessed.
>
> Hope this helps,
> Fabian
>
> 2018-02-20 9:54 GMT+01:00 Dongwon Kim :
>
>> Hi,
>>
>> It could be a totally stupid question but I c

Re: Heap Problem with Checkpoints

2018-06-19 Thread Piotr Nowojski
Hi,

Can you search the logs/std err/std output for log entries like:

log.warn("Failed to locally delete blob “ …) ?

I see in the code, that if file deletion fails for whatever the reason, 
TransientBlobCleanupTask can loop indefinitely trying to remove it over and 
over again. That might be ok, however it’s doing it without any back off time 
as fast as possible.

To confirm this, could you take couple of thread dumps and check whether some 
thread is spinning in 
org.apache.flink.runtime.blob.TransientBlobCleanupTask#run ?

If that’s indeed a case, the question would be why file deletion fails?

Piotrek

> On 18 Jun 2018, at 15:48, Fabian Wollert  wrote:
> 
> Hi Piotrek, thx a lot for your answer and sry for the late response. I was 
> running some more tests, but i still got the same problem. I was analyzing a 
> heap dump already with VisualVM, and thats how i got to the intention that it 
> was some S3 logging, but seems like i was wrong. on the newer tests, the heap 
> dump says the following (this time i used Eclipse MemoryAnalyzer): 
> 
> 
> 
> 
> Are you aware of problems with the BlobServer not cleaning up properly? I 
> tried also using a bigger instance, but this never stabilizes, it just keeps 
> increasing (gave it already 10GB+ Heap) ...
> 
> Cheers
> 
> --
> 
> Fabian Wollert
> Zalando SE
> 
> E-Mail: fabian.woll...@zalando.de 
> 
> 
> 
> Am Mo., 11. Juni 2018 um 10:46 Uhr schrieb Piotr Nowojski 
> mailto:pi...@data-artisans.com>>:
> Hi,
> 
> What kind of messages are those “logs about S3 operations”? Did you try to 
> google search them? Maybe it’s a known S3 issue?
> 
> Another approach is please use some heap space analyser from which you can 
> backtrack classes that are referencing those “memory leaks” and again try to 
> google any known memory issues.
> 
> It also could just mean, that it’s not a memory leak, but you just need to 
> allocate more heap space for your JVM (and memory consumption will stabilise 
> at some point).
> 
> Piotrek
> 
>> On 8 Jun 2018, at 18:32, Fabian Wollert > > wrote:
>> 
>> Hi, in this email thread 
>> 
>>  here, i tried to set up S3 as a filesystem backend for checkpoints. Now 
>> everything is working (Flink V1.5.0), but the JobMaster is accumulating Heap 
>> space, with eventually killing itself with HeapSpace OOM after several 
>> hours. If I don't enable Checkpointing, then everything is fine. I'm using 
>> the Flink S3 Shaded Libs (tried both the Hadoop and the Presto lib, no 
>> difference in this regard) from the tutorial. my checkpoint settings are 
>> this (job level):
>> 
>> env.enableCheckpointing(1000);
>> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);
>> env.getCheckpointConfig().setCheckpointTimeout(6);
>> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
>> 
>> Another clue why i suspect the S3 Checkpointing is that the heapspace dump 
>> contains a lot of char[] objects with some logs about S3 operations.
>> 
>> anyone has an idea where to look further on this?
>> 
>> Cheers
>> 
>> --
>> 
>> Fabian Wollert
>> Zalando SE
>> 
>> E-Mail: fabian.woll...@zalando.de
>>  
>> 
>> Tamara-Danz-Straße 1
>> 10243 Berlin
>> Fax: +49 (0)30 2759 46 93
>> E-mail: legalnot...@zalando.co.uk 
>> Notifications of major holdings (Sec. 33, 38, 39 WpHG):  +49 (0)30 2000889349
>> 
>> Management Board:
>> Robert Gentz, David Schneider, Rubin Ritter
>> 
>> Chairman of the Supervisory Board:
>> Lothar Lanz
>> 
>> Person responsible for providing the contents of Zalando SE acc. to Art. 55 
>> RStV [Interstate Broadcasting Agreement]: Rubin Ritter
>> Registered at the Local Court Charlottenburg Berlin, HRB 158855 B
>> VAT registration number: DE 260543043
> 



Re: Flink application does not scale as expected, please help!

2018-06-19 Thread Fabian Hueske
Hi Siew,

The hint about the lower source parallelism compared to the operator
parallelism might be the right one.

Can you check if all source tasks are scheduled to the same machine?
In that case your application might be bottlenecked by the out-going
network connection of that single machine.

Best, Fabian



2018-06-18 17:29 GMT+02:00 Siew Wai Yow :

> Thanks @Fabian for your confirmation about the better performance when
> scaling happened at same TM machine. But it is so funny that it give
> impression "the more I scale the less I get" when the performance drop with
> more TM in play.
>
> @Ovidiu question is interesting to know too. @Till do you mind to share
> your thoughts?
>
> Thank you guys!
>
> --
> *From:* Ovidiu-Cristian MARCU 
> *Sent:* Monday, June 18, 2018 6:28 PM
> *To:* Fabian Hueske
> *Cc:* Siew Wai Yow; Jörn Franke; user@flink.apache.org
> *Subject:* Re: Flink application does not scale as expected, please help!
>
> Hi all,
>
> Allow me to add some comments/questions on this issue that is very
> interesting.
> According to documentation [1] the pipeline example assumes the source is
> running with the same parallelism as successive map operator and the
> workflow optimizes to collocate source and map tasks if possible.
>
> For an application configuring the source with different parallelism,
> assuming N task managers each with m slots, if I configure
> the source operator with parallelism m, then all of the source's tasks
> could be scheduled on the first task manager?
> I think the same story holds for sinks tasks.
> So, in general is there any control over scheduling of source and sink
> tasks?
> Would it be possible to enforce scheduling of source tasks to be balanced
> across task managers? Not sure if this is the default.
> If the source creates a non-keyed stream, can we enforce the source to
> push records to local map tasks?
>
> For Siew’s example, after source#map a keyBy complicates further things
> since each key can be possibly processed on another task manager.
> At least the keyBy operator should run with the same parallelism as source
> and map and be pipelined on same slot (maybe shared slot configuration
> could enforce that).
>
> *DataStream AggregatedRecordWithAuditStream = sourceStringStream*
> *.map(new JsonToRecordTranslator(markerFactory.getMarker(),
> inputlink)).name("JsonRecTranslator").setParallelism(pJ2R) *
> *.keyBy(new KeySelector() {*
> *private static final long serialVersionUID = 1L;*
> *@Override*
> *public String getKey(Record r) throws Exception {*
> *return r.getUNIQUE_KEY(); *
> *}*
> *}) *
> *.process(new ProcessAggregation(aggrDuration, markerFactory.getMarker(),
> markerFactory.getMarker())).setParallelism(pAggr)*
> *.name("AggregationDuration: " + aggrDuration +"ms");*
>
>
> Thanks,
> Ovidiu
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> release-1.5/internals/job_scheduling.html
>
> 
> Apache Flink 1.5 Documentation: Jobs and Scheduling
> 
> ci.apache.org
> Execution resources in Flink are defined through Task Slots. Each
> TaskManager will have one or more task slots, each of which can run one
> pipeline of parallel tasks. A pipeline consists of multiple successive
> tasks, such as the n-th parallel instance of a MapFunction together with
> the n-th parallel ...
>
> On 18 Jun 2018, at 10:05, Fabian Hueske  wrote:
>
> Not sure if TM local assignment is explicitly designed in 1.5.0, but it
> might be an artifact of how slots are registered in the resource manager.
> Till (in CC) should know how that works.
>
> Anyway, tasks that run in the same TM exchange data via in-memory channels
> which is of course much faster than going over the network.
> So yes, a performance drop when tasks are scheduled to different TMs is
> not unexpected IMO.
> You can check that by starting multiple TMs with a single slot each and
> running you job on that setup.
>
> Best, Fabian
>
>
>
> 2018-06-18 9:57 GMT+02:00 Siew Wai Yow :
>
> Hi Fabian,
>
> We are using Flink 1.5.0. Any different in scheduler in Flink 1.5.0?
>
> *"Hence, applications might scale better until tasks are scheduled to
> different machines."*
>
> This seems the case. We have 32 vCPU 16 slots in one TM machine. So the
> scaling work perfectly 1-2-4-8-16 because all happens in same TM. When
> scale to 32 the performance drop, not even in par with case of parallelism
> 16. Is this something expected? Thank you.
>
> Regards,
> Yow
>
> --
> *From:* Fabian Hueske 
> *Sent:* Monday, June 18, 2018 3:47 PM
> *To:* Siew Wai Yow
> *Cc:* Jörn Franke; user@flink.apache.org
>
> *Subject:* Re: Flink application does not scale as expected, please help!
>
> Hi,
>
> Which Flink version are you using?
> Did you try to analyze the bottleneck of the application, i.e., is i

Re: Flink application does not scale as expected, please help!

2018-06-19 Thread Till Rohrmann
Hi,

as Fabian explained, if you exceed the number of slots on a single TM, then
Flink needs to deploy tasks on other TMs which causes a network transfer
between the sources and the mappers. This will have a negative impact if
you compare it to a setup where everything runs locally.

The scheduling in Flink works as follows: Flink will try to co-locate
consumer tasks with their producers in order to avoid costly network
communication. If there are more than 8 producer tasks, then it is assumed
that the consumer can be scheduled everywhere because you might only
achieve to read 1/8 of your input locally.

The question now is how the sources are scheduled, because they determine
how the mappers are scheduled. For tasks with no inputs, Flink will pick a
random slot without preferences for spreading them across as many TMs as
possible or packing them as closely together as possible. The reason is
that both strategies might perform better depending on the actual use case.
Moreover, the way the scheduling works right now, the scheduler does not
know how many sources will be scheduled. This means that you don't
necessarily know how many TMs you have to start in order to execute all
source tasks and, thus, you don't know across how many TMs you have to
spread your sources. Due to the implementation of the SlotManager, which is
responsible for the slot management on the RM, we will first allocate all
slots from a single TM before allocating slots from another TM.

Due to the setup of your test (different parallelism of sources and
mappers), you introduce a shuffle between the sources and the Json mappers.
This is not optimal and I would suggest to instantiate as many sources as
you have mappers. That way these two operators can be chained and run in
the same thread. This would also spread the load of generating events
across the whole cluster instead of letting a single machine be responsible
for producing all events. Moreover, in order to get rid of the measurement
artifacts introduced by switching from local to remote communication you
could start your TMs with a single slot as Fabian suggested. Otherwise you
might see a slight drop whenever you exceed the multiple of 16 by one.

As a side node, if you don't need a shuffle when changing the parallelism
between two operators, then you could also use the rescale command which
tries to minimize the communication between two operators. For example, if
your source has a parallelism of 2 and your mapper of 4, then rescale would
cause the first source to talk to mapper 1 and 2 and the second source to
only talk to mapper 3 and 4.

Cheers,
Till

On Mon, Jun 18, 2018 at 5:30 PM Siew Wai Yow  wrote:

> Thanks @Fabian for your confirmation about the better performance when
> scaling happened at same TM machine. But it is so funny that it give
> impression "the more I scale the less I get" when the performance drop with
> more TM in play.
>
> @Ovidiu question is interesting to know too. @Till do you mind to share
> your thoughts?
>
> Thank you guys!
>
> --
> *From:* Ovidiu-Cristian MARCU 
> *Sent:* Monday, June 18, 2018 6:28 PM
> *To:* Fabian Hueske
> *Cc:* Siew Wai Yow; Jörn Franke; user@flink.apache.org
> *Subject:* Re: Flink application does not scale as expected, please help!
>
> Hi all,
>
> Allow me to add some comments/questions on this issue that is very
> interesting.
> According to documentation [1] the pipeline example assumes the source is
> running with the same parallelism as successive map operator and the
> workflow optimizes to collocate source and map tasks if possible.
>
> For an application configuring the source with different parallelism,
> assuming N task managers each with m slots, if I configure
> the source operator with parallelism m, then all of the source's tasks
> could be scheduled on the first task manager?
> I think the same story holds for sinks tasks.
> So, in general is there any control over scheduling of source and sink
> tasks?
> Would it be possible to enforce scheduling of source tasks to be balanced
> across task managers? Not sure if this is the default.
> If the source creates a non-keyed stream, can we enforce the source to
> push records to local map tasks?
>
> For Siew’s example, after source#map a keyBy complicates further things
> since each key can be possibly processed on another task manager.
> At least the keyBy operator should run with the same parallelism as source
> and map and be pipelined on same slot (maybe shared slot configuration
> could enforce that).
>
> *DataStream AggregatedRecordWithAuditStream = sourceStringStream*
> *.map(new JsonToRecordTranslator(markerFactory.getMarker(),
> inputlink)).name("JsonRecTranslator").setParallelism(pJ2R) *
> *.keyBy(new KeySelector() {*
> *private static final long serialVersionUID = 1L;*
> *@Override*
> *public String getKey(Record r) throws Exception {*
> *return r.getUNIQUE_KEY(); *
> *}*
> *}) *
> *.process(new ProcessAggregation(aggrDuration, ma