Flink TTL for MapStates and Sideoutputs implementations

2020-05-21 Thread Jaswin Shah
public class CartPGCoprocessFunction extends 
KeyedCoProcessFunction {

private static final Logger logger = 
LoggerFactory.getLogger(CartPGCoprocessFunction.class);

/**
 * Map state for cart messages, orderId+mid is key and cartMessage is value.
 */
private static MapState cartState = null;

/**
 * Map state for pg messages, orderId+mid is key and pgMessage is value.
 */
private static MapState pgState = null;

/**
 * Intializations for cart and pg mapStates
 *
 * @param config
 */
@Override
public void open(Configuration config) {
MapStateDescriptor cartStateDescriptor = new 
MapStateDescriptor<> (
Constants.CART_DATA,
TypeInformation.of(String.class),
TypeInformation.of(CartMessage.class)
);
cartState = getRuntimeContext().getMapState(cartStateDescriptor);

MapStateDescriptor 
pgStateDescriptor = new MapStateDescriptor<>(
Constants.PG_DATA,
TypeInformation.of(String.class),
TypeInformation.of(PaymentNotifyRequestWrapper.class)
);
pgState = getRuntimeContext().getMapState(pgStateDescriptor);
}

/**
 * 1. Get orderId+mid from cartMessage and check in PGMapState if an entry 
is present.
 * 2. If present, match, checkDescripancy, process and delete entry from 
pgMapState.
 * 3. If not present, add orderId+mid as key and cart object as value in 
cartMapState.
 * @param cartMessage
 * @param context
 * @param collector
 * @throws Exception
 */
@Override
public void processElement1(CartMessage cartMessage, Context context, 
Collector collector) throws Exception {
String searchKey = cartMessage.createJoinStringCondition();
PaymentNotifyRequestWrapper paymentNotifyObject = 
pgState.get(searchKey);
if(Objects.nonNull(paymentNotifyObject)) {
generateResultMessage(cartMessage,paymentNotifyObject,collector);
pgState.remove(searchKey);
} else {
cartState.put(searchKey,cartMessage);
}
}

/**
 * 1. Get orderId+mid from pgMessage and check in cartMapState if an entry 
is present.
 * 2. If present, match, checkDescripancy, process and delete entry from 
cartMapState.
 * 3. If not present, add orderId+mid as key and cart object as value in 
pgMapState.
 * @param pgMessage
 * @param context
 * @param collector
 * @throws Exception
 */
@Override
public void processElement2(PaymentNotifyRequestWrapper pgMessage, Context 
context, Collector collector) throws Exception {
String searchKey = pgMessage.createJoinStringCondition();
CartMessage cartMessage = cartState.get(searchKey);
if(Objects.nonNull(cartMessage)) {
generateResultMessage(cartMessage,pgMessage,collector);
cartState.remove(searchKey);
} else {
pgState.put(searchKey,pgMessage);
}
}

/**
 * Create ResultMessage from cart and pg messages.
 *
 * @param cartMessage
 * @param pgMessage
 * @return
 */
private void generateResultMessage(CartMessage cartMessage, 
PaymentNotifyRequestWrapper pgMessage,Collector collector) {
ResultMessage resultMessage = new ResultMessage();
Payment payment = null;

//Logic should be in cart: check
for (Payment pay : cartMessage.getPayments()) {
if (StringUtils.equals(Constants.FORWARD_PAYMENT, 
pay.mapToPaymentTypeInPG()) && StringUtils.equals(Constants.PAYTM_NEW_PROVIDER, 
pay.getProvider())) {
payment = pay;
break;
}
}
if(Objects.isNull(payment)) {
return;
}

resultMessage.setOrderId(cartMessage.getId());
resultMessage.setMid(payment.getMid());


resultMessage.setCartOrderStatus(cartMessage.mapToOrderStatus().getCode());
resultMessage.setPgOrderStatus(pgMessage.getOrderStatus());

resultMessage.setCartOrderCompletionTime(payment.getUpdated_at());
resultMessage.setPgOrderCompletionTime(pgMessage.getCreatedTime());

resultMessage.setPgOrderAmount(pgMessage.getOrderAmount().getValue());

resultMessage.setCartOrderAmount(String.valueOf(cartMessage.getGrandtotal().longValue()));

resultMessage.setCartPaymethod(payment.getPayment_method());

resultMessage.setPgPaymethod(pgMessage.getPaymentView().getPayOptionInfos()[0].getPayMethod());

checkDescripancyAndCollectResult(resultMessage,collector);
}

/**
 * Evaluate if there is descripancy of any fields between the messages from 
two different systems.
 * Write all the descripancy logic here.
 *
 * @param resultMessage
 */
private void checkDescripancyAndCollectResult(ResultMessage resultMessage, 
Collector collector) {

if (!StringUtils.equalsIgnoreCase(resultMessage.getCartOrderStatus(), 

Re: [ANNOUNECE] release-1.11 branch cut

2020-05-21 Thread Piotr Nowojski
Hi,

Quick update on the release. After the feature freeze me and Zhijiang were
reached out by quite a bit of contributors, which had features in "almost
done" state and were asking for permission to merge them after the
announced release branch cut and in most cases we granted this permission.
As those were merged on Tuesday/Wednesday morning, at the moment we are not
anticipating more exemptions, but as before: if you think something
needs to be merged, please reach out to us.

Also we are preparing first release candidate build, while at the same time
we are working hard to solve all of the blocking bugs/issues for 1.11
release [1] (help solving those would be highly appreciated).

Piotrek

[1]
https://issues.apache.org/jira/browse/FLINK-10694?filter=12348362&jql=project%20%3D%20FLINK%20AND%20status%20in%20(Open%2C%20%22In%20Progress%22%2C%20Reopened)%20AND%20priority%20in%20(Blocker%2C%20Critical)%20AND%20fixVersion%20%3D%201.11.0%20ORDER%20BY%20Rank%20ASC

pon., 18 maj 2020 o 13:59 Piotr Nowojski  napisał(a):

> Hi community,
>
> I have cut the release-1.11 from the master branch based
> on bf5594cdde428be3521810cb3f44db0462db35df commit.
>
> If you will be merging something into the master branch, please make sure
> to set the correct fix version in the JIRA, accordingly to which branch
> have you merged your code. Especially pay attention to it, if you have
> merged something to the master today (on Monday), as your commit might have
> ended up before or after release cut.
>
> As a reminder, please do not merge any new features to release-1.11
> branch, without the approval from release managers (me or Zhijiang). This
> restriction doesn't apply to bug fixes.
>
> Best regards,
> Piotrek
>


Why Flink Connector JDBC does't support LocalDateTime ?

2020-05-21 Thread forideal
Hello, my friends


  env: Flink 1.10, Blink Planner   
 table source
 CREATE TABLE josn_table ( order_id VARCHAR, event_time TIMESTAMP(3), proc_time 
AS PROCTIME() ) WITH (
'connector.properties.0.key' = 'bootstrap.servers',

'connector.properties.0.value' = 'localhost:9092',
'connector.property-version' = '1',
'connector.startup-mode' = 'earliest-offset',
'connector.topic' = 'raw',
'connector.type' = 'kafka',
'connector.version' = '0.11',
'format.derive-schema' = 'true',
'format.property-version' = '1',
'format.type' = 'json',

'update-mode' = 'append'

)
mysql dim table 
CREATE TABLE ilms_box_d_order ( id VARCHAR, event_timeTIMESTAMP(3)) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://localhost/mydb',
'connector.table' = 'test',
'connector.driver' = 'com.mysql.jdbc.Driver',
'connector.username' = 'test',
'connector.password' = 'test',
'connector.property-version' = '1'
);
DML
INSERT INTO console_test SELECT
t1. event_time,
order_id
FROM
josn_table
LEFT JOIN ilms_box_d_order FOR SYSTEM_TIME AS OF josn_table.proc_time AS t1 ON 
josn_table.order_id = t1.id and  josn_table.event_time = t1.event_time;
When i exec this sql, i will get the follewing exception.
Caused by: java.lang.ClassCastException: java.time.LocalDateTime cannot be 
cast to java.sql.Timestamp, field index: 1, field value: 2020-05-22T14:00.
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcInputFormat.java#L236


Why don't we support LocalDateTime? 


Best wishes.
forideal







Re: Writing to SQL server

2020-05-21 Thread Timo Walther

Hi Martin,

usually, this error occurs when people forget to add 
`org.apache.flink.api.scala._` to their imports. It is triggered by the 
Scala macro that the DataStream API uses for extracting types.


Can you try to call `result.toAppendStream[Row]` directly? This should 
work if you import `org.apache.flink.table.api.scala._`.


Maybe this example helps:

https://github.com/apache/flink/blob/release-1.10/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala

Regards,
Timo


On 22.05.20 08:02, Martin Frank Hansen wrote:

Hi,

I am trying to write input from Kafka to a SQL server on AWS, but I have 
difficulties.


I get the following error could not find implicit value for evidence 
parameter of type 
org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.types.Row]

[error]   val dsRow = tableEnv.toAppendStream[Row](result)
[error]                                           ^

Any help is appreciated

I am not sure whether my approach is correct or not but my code is 
as follows:


import java.util.Properties

import org.apache.flink.table.descriptors.{Json, Kafka, Schema}
import org.apache.flink.api.java.io.jdbc.{JDBCInputFormat, JDBCOutputFormat}
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, 
FlinkKafkaConsumerBase, FlinkKafkaProducer, KafkaSerializationSchema}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.scala._
import org.apache.flink.api.common.serialization.{SimpleStringEncoder, 
SimpleStringSchema}
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.table.api.{EnvironmentSettings, Table, 
TableEnvironment, Types}
import org.apache.flink.types.Row

   val properties =new Properties()
   properties.setProperty("bootstrap.servers",b_servers)
   properties.setProperty("zookeeper.connect",zk)
   properties.setProperty("group.id ", "very_small_test")
   properties.setProperty("ssl.endpoint.identification.algorithm ", "")
   properties.setProperty("security.protocol", "SSL")


   val kafkaSource: FlinkKafkaConsumerBase[String] =new 
FlinkKafkaConsumer[String]("very_small_test", new SimpleStringSchema(), 
properties).setStartFromTimestamp(0)

   val settings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
   val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env, settings)

val schema =new Schema()
 .field("fullVisitorId",Types.STRING)
 .field("eventTime",Types.STRING)
 .field("eventID",Types.STRING)
 .field("eventType",Types.STRING)
 .field("page",Types.MAP( Types.STRING, Types.STRING))
 .field("CustomDimensions",Types.MAP( Types.STRING, Types.STRING))


   tableEnv.connect(new Kafka()
   .version("universal")
   .topic("very_small_test")
   .properties(properties)
   .startFromEarliest()
  )
 .withFormat(
 new Json()
   .failOnMissingField(false)
   .deriveSchema()
   )
 .withSchema(schema)
 .inAppendMode()
 .registerTableSource("sql_source")


val sqlStatement ="SELECT * from sql_source where CustomDimensions['pagePath'] like 
'%BT%'"

val result =tableEnv.sqlQuery(sqlStatement)

   val dsRow =tableEnv.toAppendStream[Row](result)


val jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()
 .setDrivername("com.microsoft.sqlserver.jdbc.SQLServerDriver")
 .setDBUrl("AWS url")
.setUsername(username)
 .setPassword(password)
 .setQuery("INSERT INTO kafka_data_test (fullVisitorId, EventTime, eventID) VALUES 
(?, ?, ?)")

 .setBatchInterval(100)
 .finish()

   dsRow.writeUsingOutputFormat(jdbcOutput)

tableEnv.execute("SQL test")


--

*Best regards

Martin Hansen*





Writing to SQL server

2020-05-21 Thread Martin Frank Hansen
Hi,

I am trying to write input from Kafka to a SQL server on AWS, but I have
difficulties.

I get the following error could not find implicit value for evidence
parameter of type
org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.types.Row]
[error]   val dsRow = tableEnv.toAppendStream[Row](result)
[error]   ^

Any help is appreciated

I am not sure whether my approach is correct or not but my code is
as follows:

import java.util.Properties

import org.apache.flink.table.descriptors.{Json, Kafka, Schema}
import org.apache.flink.api.java.io.jdbc.{JDBCInputFormat, JDBCOutputFormat}
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer,
FlinkKafkaConsumerBase, FlinkKafkaProducer, KafkaSerializationSchema}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.scala._
import org.apache.flink.api.common.serialization.{SimpleStringEncoder,
SimpleStringSchema}
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.table.api.{EnvironmentSettings, Table,
TableEnvironment, Types}
import org.apache.flink.types.Row

val properties = new Properties()
properties.setProperty("bootstrap.servers",b_servers)
properties.setProperty("zookeeper.connect",zk)
properties.setProperty("group.id", "very_small_test")
properties.setProperty("ssl.endpoint.identification.algorithm ", "")
properties.setProperty("security.protocol", "SSL")


val kafkaSource: FlinkKafkaConsumerBase[String] = new FlinkKafkaConsumer[
String]("very_small_test", new SimpleStringSchema(), properties
).setStartFromTimestamp(0)

val settings = EnvironmentSettings.newInstance
().useBlinkPlanner().inStreamingMode().build()
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env, settings)

val schema = new Schema()
.field("fullVisitorId",Types.STRING)
.field("eventTime",Types.STRING)
.field("eventID",Types.STRING)
.field("eventType",Types.STRING)
.field("page",Types.MAP( Types.STRING, Types.STRING))
.field("CustomDimensions",Types.MAP( Types.STRING, Types.STRING))


tableEnv.connect(new Kafka()
.version("universal")
.topic("very_small_test")
.properties(properties)
.startFromEarliest()
)
.withFormat(
new Json()
.failOnMissingField(false)
.deriveSchema()
)
.withSchema(schema)
.inAppendMode()
.registerTableSource("sql_source")


val sqlStatement = "SELECT * from sql_source where
CustomDimensions['pagePath'] like '%BT%'"

val result = tableEnv.sqlQuery(sqlStatement)

val dsRow = tableEnv.toAppendStream[Row](result)


val jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()
.setDrivername("com.microsoft.sqlserver.jdbc.SQLServerDriver")
.setDBUrl("AWS url")
.setUsername(username)
.setPassword(password)
.setQuery("INSERT INTO kafka_data_test (fullVisitorId, EventTime, eventID)
VALUES (?, ?, ?)")
.setBatchInterval(100)
.finish()

dsRow.writeUsingOutputFormat(jdbcOutput)

tableEnv.execute("SQL test")


-- 



*Best regardsMartin Hansen*


回复:Performance issue when writing to HDFS

2020-05-21 Thread Yun Gao
Hi Kong,

 Sorry that I'm not expert of Hadoop, but from the logs and Google, It 
seems more likely to be a problem of HDFS side [1] ? Like long-time GC in 
DataNode.

 Also I have found a similar issue from the history mails [2], and the 
conclusion should be similar.

 Best,
 Yun


   [1] 
https://community.cloudera.com/t5/Support-Questions/Solution-for-quot-slow-readprocessor-quot-warnings/td-p/122046
   [2] 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/quot-Slow-ReadProcessor-quot-warnings-when-using-BucketSink-td9427.html



 --原始邮件 --
发件人:Mu Kong 
发送时间:Fri May 22 11:16:32 2020
收件人:user 
主题:Performance issue when writing to HDFS

Hi all,

I have Flink application consuming from Kafka and writing the data to HDFS 
bucketed by event time with BucketingSink.
Sometimes, the the traffic gets high and from the prometheus metrics, it shows 
the writing is not stable.

(getting from flink_taskmanager_job_task_operator_numRecordsOutPerSecond)

The output data on HDFS is also getting delayed. (The records for a certain 
hour bucket are written to HDFS 50 minutes after that hour)

I looked into the log, and find warning regarding the datanode ack, which might 
be related:

DFSClient exception:2020-05-21 10:43:10,432 INFO  
org.apache.hadoop.hdfs.DFSClient  - Exception in 
createBlockOutputStream
java.io.IOException: Got error, status message , ack with firstBadLink as :1004
at 
org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.checkBlockOpStatus(DataTransferProtoUtil.java:140)
at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1478)
at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1380)
at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:558)

 Slow ReadProcessor read fields warning:2020-05-21 10:42:30,509 WARN  
org.apache.hadoop.hdfs.DFSClient  - Slow 
ReadProcessor read fields took 30230ms (threshold=3ms); ack: seqno: 126 
reply: SUCCESS reply: SUCCESS reply: SUCCESS downstreamAckTimeNanos: 372753456 
flag: 0 flag: 0 flag: 0, targets: [DatanodeInfoWithStorage[:1004,DS-833b175e-9848-453d-a222-abf5c05d643e,DISK], 
DatanodeInfoWithStorage[:1004,DS-f998208a-df7b-4c63-9dde-26453ba69559,DISK], 
DatanodeInfoWithStorage[:1004,DS-4baa6ba6-3951-46f7-a843-62a13e3a62f7,DISK]]


We haven't done any tuning for the Flink job regarding writing to HDFS. Is 
there any config or optimization we can try to avoid delay and these warnings?

Thanks in advance!!

Best regards,
Mu

Re: Stateful functions Harness

2020-05-21 Thread Tzu-Li (Gordon) Tai
Hi,

Sorry, I need to correct my comment on using the Kafka ingress / egress
with the Harness.

That is actually doable, by adding an extra dependency to
`statefun-flink-distribution` in your Harness program.
That pulls in all the other required dependencies required by the Kafka
ingress / egress, such as the source / sink providers and Flink Kafka
connectors.

Cheers,
Gordon

On Fri, May 22, 2020 at 12:04 PM Tzu-Li (Gordon) Tai 
wrote:

> Are you getting an exception from running the Harness?
> The Harness should already have the required configurations, such as the
> parent first classloading config.
>
> Otherwise, if you would like to add your own configuration, use the
> `withConfiguration` method on the `Harness` class.
>
> On Fri, May 22, 2020 at 7:19 AM Boris Lublinsky <
> boris.lublin...@lightbend.com> wrote:
>
>> Also, where do I put flint-conf.yaml in Idea to add additional required
>> config parameter:
>>
>> classloader.parent-first-patterns.additional: 
>> org.apache.flink.statefun;org.apache.kafka;com.google.protobuf
>>
>>
>>
>> On May 21, 2020, at 12:22 PM, Boris Lublinsky <
>> boris.lublin...@lightbend.com> wrote:
>>
>> Hi,
>> I am trying to run
>> https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-greeter-example
>>  locally
>> using
>> https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-flink-harness-example
>>
>> And have several questions.
>> 1. It seems fairly straightforward to use it with in memory message
>> generators, but I can’t figure out how to add Kafka ingress/Egress so that
>> I can use it with Kafk
>> 2. GreetingModule already creates StatefulFunctionUniverse  and so does
>> Harness. Is there a way to short circuit it and have Harness get
>> StatefulFunctionUniverse directly
>> 3. Is there an example on how to write Flink main for stageful function?
>> 4. Is there an example anywhere on how to run such examples in the IDE
>> with Kafka?
>> 5 There is a great stateful functions example
>> https://github.com/ververica/flink-statefun-workshop, but its readme
>> does not really describe implementation and neither does this article,
>> referencing it
>> https://dev.to/morsapaes/flink-stateful-functions-where-to-start-2j39.
>> Is there anything that describes this implementation?
>>
>>
>>


Re: Flink Window with multiple trigger condition

2020-05-21 Thread Tzu-Li (Gordon) Tai
Hi,

To achieve what you have in mind, I think what you have to do is to use a
processing time window of 30 mins, and have a custom trigger that matches
the "start" event in the `onElement` method and return
TriggerResult.FIRE_AND_PURGE.

That way, the window fires either when the processing time has passed, or
the start event was recieved.

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Using Queryable State within 1 job + docs suggestion

2020-05-21 Thread Tzu-Li (Gordon) Tai
This in general is not a good idea, as the state you query using queryable
state within a job does not provide any consistency guarantees at all.

Would it be possible to have some trigger that emits state of the windows,
and join the states downstream?
In general, that is a better approach for what you seem to be trying to
achieve.

Otherwise, when it comes to "querying state across operators", that's a hint
where the Stateful Functions [1] model could maybe be a better fit to your
use case here. Specifically, using Stateful Functions, you would model
"querying state" as a request to the target function, with the target
function sending its state back as a response.

Cheers,
Gordon

[1] https://flink.apache.org/stateful-functions.html



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Using Queryable State within 1 job + docs suggestion

2020-05-21 Thread Tzu-Li (Gordon) Tai
Hi,

That in general is not a good idea, with the problem you mentioned as well
as the fact that the state you
query within the same job using queryable state does not provide any means
of consistency guarantee.

When it comes to "querying state from another operator", it is a hint that
your use case can potentially be
better modeled using the Stateful Functions framework [1]. With Stateful
Functions, you would model this
as a request message to the target function, with the target function
replying a response carrying its state.
There are still other shortcomings though, for example StateFun currently
doesn't support windowed state yet.

Cheers,
Gordon

[1] https://flink.apache.org/stateful-functions.html

On Thu, May 21, 2020 at 10:25 PM Annemarie Burger <
annemarie.bur...@campus.tu-berlin.de> wrote:

> Hi,
>
> So what I meant was that I have a keyed stream, and from each
> thread/keygroup/PU I want to query the state of the other
> threads/keygroups/PUs.
>
> Does anybody have any experience with this?
>
> I'm currently working on it, and the main problem seems to be that the
> Queryable State Client requires the JobID from which to query the state,
> which in my case would be the same as its own jobID. Any ideas how to
> workaround this?
> Using env.getStreamGraph.getJobGraph.getJobID doesn't seem to work.
>
> Best,
> Annemarie
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Performance impact of many open windows at the same time

2020-05-21 Thread Tzu-Li (Gordon) Tai
Hi Joe,

The main effect this should have is more state to be kept until the windows
can be fired (and state purged).
This would of course increase the time it takes to checkpoint the operator.

I'm not sure if there will be significant runtime per-record impact caused
by how windows are bookkeeped in data structures in the WindowOperator,
maybe Aljoscha (cc'ed) can chime in here for anything.
If it is certain that these windows will never fire (until far into the
future) because the event-timestamps are in the first place corrupted, then
it might make sense to have a way to drop windows based on some criteria.
I'm not sure if that is supported in any way without triggers (since you
mentioned that those windows might not receive any data), again Aljoscha
might be able to provide more info here.

Cheers,
Gordon

On Thu, May 21, 2020 at 7:02 PM Joe Malt  wrote:

> Hi all,
>
> I'm looking into what happens when messages are ingested with timestamps
> far into the future (e.g. due to corruption or a wrong clock at the sender).
>
> I'm aware of the effect on watermarking, but another thing I'm concerned
> about is the performance impact of the extra windows this will create.
>
> If a Flink operator has many (perhaps hundreds or thousands) of windows
> open but not receiving any data (and never firing), will this degrade
> performance?
>
> Thanks,
> Joe
>


Re: Stateful functions Harness

2020-05-21 Thread Tzu-Li (Gordon) Tai
Are you getting an exception from running the Harness?
The Harness should already have the required configurations, such as the
parent first classloading config.

Otherwise, if you would like to add your own configuration, use the
`withConfiguration` method on the `Harness` class.

On Fri, May 22, 2020 at 7:19 AM Boris Lublinsky <
boris.lublin...@lightbend.com> wrote:

> Also, where do I put flint-conf.yaml in Idea to add additional required
> config parameter:
>
> classloader.parent-first-patterns.additional: 
> org.apache.flink.statefun;org.apache.kafka;com.google.protobuf
>
>
>
> On May 21, 2020, at 12:22 PM, Boris Lublinsky <
> boris.lublin...@lightbend.com> wrote:
>
> Hi,
> I am trying to run
> https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-greeter-example
>  locally
> using
> https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-flink-harness-example
>
> And have several questions.
> 1. It seems fairly straightforward to use it with in memory message
> generators, but I can’t figure out how to add Kafka ingress/Egress so that
> I can use it with Kafk
> 2. GreetingModule already creates StatefulFunctionUniverse  and so does
> Harness. Is there a way to short circuit it and have Harness get
> StatefulFunctionUniverse directly
> 3. Is there an example on how to write Flink main for stageful function?
> 4. Is there an example anywhere on how to run such examples in the IDE
> with Kafka?
> 5 There is a great stateful functions example
> https://github.com/ververica/flink-statefun-workshop, but its readme does
> not really describe implementation and neither does this article,
> referencing it
> https://dev.to/morsapaes/flink-stateful-functions-where-to-start-2j39. Is
> there anything that describes this implementation?
>
>
>


Re: Stateful functions Harness

2020-05-21 Thread Tzu-Li (Gordon) Tai
Sorry, forgot to cc user@ as well in the last reply.

On Fri, May 22, 2020 at 12:01 PM Tzu-Li (Gordon) Tai 
wrote:

> As an extra note, the utilities you will find in `statefun-e2e-tests`,
> such as the `StatefulFunctionsAppsContainers` is not yet intended for users.
> This however was previously discussed before. Would be great to hear
> feedback from you on how it works for you if you do decide to give that a
> try.
>
> On Fri, May 22, 2020 at 11:58 AM Tzu-Li (Gordon) Tai 
> wrote:
>
>> Hi,
>>
>> On Fri, May 22, 2020 at 7:19 AM Boris Lublinsky <
>> boris.lublin...@lightbend.com> wrote:
>>
>>> Also, where do I put flint-conf.yaml in Idea to add additional required
>>> config parameter:
>>>
>>> classloader.parent-first-patterns.additional: 
>>> org.apache.flink.statefun;org.apache.kafka;com.google.protobuf
>>>
>>>
>>>
>>> On May 21, 2020, at 12:22 PM, Boris Lublinsky <
>>> boris.lublin...@lightbend.com> wrote:
>>>
>>> Hi,
>>> I am trying to run
>>> https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-greeter-example
>>>  locally
>>> using
>>> https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-flink-harness-example
>>>
>>> And have several questions.
>>> 1. It seems fairly straightforward to use it with in memory message
>>> generators, but I can’t figure out how to add Kafka ingress/Egress so that
>>> I can use it with Kafk
>>>
>>> Could you provide some context on why you would want to do that?
>>
>> The StateFun Flink Harness was not intended to work with the usual
>> shipped ingress / egresses, but purely as a utility for users to run
>> StateFun applications in a consolidated local setup.
>> For testing against Kafka, I would suggest looking at how the StateFun
>> end-to-end tests do it, using testcontainers.
>> The tests are located under `statefun-e2e-tests` module.
>>
>> If you still want to use the Flink Harness for this, you may be able to
>> use the withFlinkSourceFunction function to directly supply the Flink Kafka
>> connector.
>> This only works for the ingress side, though.
>>
>>> 2. GreetingModule already creates StatefulFunctionUniverse  and so does
>>> Harness. Is there a way to short circuit it and have Harness get
>>> StatefulFunctionUniverse directly
>>>
>>> That is not possible. The StatefulFunctionUniverse that the Harness
>> utility provides is always a "mock" one, which contains the defined
>> in-memory ingress and egresses.
>> As previously mentioned, that is because the Flink Harness was intended
>> for running StateFun applications without the need to interact with any
>> other external systems.
>>
>>> 3. Is there an example on how to write Flink main for stageful function?
>>>
>>> At the moment, it is not possible to directly integrate Flink APIs and
>> Stateful Functions APIs in a single job.
>> What do you have in mind for what you want to achieve?
>>
>>> 4. Is there an example anywhere on how to run such examples in the IDE
>>> with Kafka?
>>>
>>> The tests in `statefun-e2e-tests` can be run in the IDE and tests
>> against Kafka. It does require Docker to be available though.
>>
>>> 5 There is a great stateful functions example
>>> https://github.com/ververica/flink-statefun-workshop, but its readme
>>> does not really describe implementation and neither does this article,
>>> referencing it
>>> https://dev.to/morsapaes/flink-stateful-functions-where-to-start-2j39.
>>> Is there anything that describes this implementation?
>>>
>>> I think the bottom half of the article provides some details of the
>> example, including the messaging between functions and a rough sketch of
>> the functions. Maybe its not detailed enough?
>> In particular, what parts of the example would you want to have more
>> details on?
>>
>> Cheers,
>> Gordon
>>
>>
>


Re: Stateful functions Harness

2020-05-21 Thread Tzu-Li (Gordon) Tai
Hi,

On Fri, May 22, 2020 at 7:19 AM Boris Lublinsky <
boris.lublin...@lightbend.com> wrote:

> Also, where do I put flint-conf.yaml in Idea to add additional required
> config parameter:
>
> classloader.parent-first-patterns.additional: 
> org.apache.flink.statefun;org.apache.kafka;com.google.protobuf
>
>
>
> On May 21, 2020, at 12:22 PM, Boris Lublinsky <
> boris.lublin...@lightbend.com> wrote:
>
> Hi,
> I am trying to run
> https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-greeter-example
>  locally
> using
> https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-flink-harness-example
>
> And have several questions.
> 1. It seems fairly straightforward to use it with in memory message
> generators, but I can’t figure out how to add Kafka ingress/Egress so that
> I can use it with Kafk
>
> Could you provide some context on why you would want to do that?

The StateFun Flink Harness was not intended to work with the usual shipped
ingress / egresses, but purely as a utility for users to run StateFun
applications in a consolidated local setup.
For testing against Kafka, I would suggest looking at how the StateFun
end-to-end tests do it, using testcontainers.
The tests are located under `statefun-e2e-tests` module.

If you still want to use the Flink Harness for this, you may be able to use
the withFlinkSourceFunction function to directly supply the Flink Kafka
connector.
This only works for the ingress side, though.

> 2. GreetingModule already creates StatefulFunctionUniverse  and so does
> Harness. Is there a way to short circuit it and have Harness get
> StatefulFunctionUniverse directly
>
> That is not possible. The StatefulFunctionUniverse that the Harness
utility provides is always a "mock" one, which contains the defined
in-memory ingress and egresses.
As previously mentioned, that is because the Flink Harness was intended for
running StateFun applications without the need to interact with any other
external systems.

> 3. Is there an example on how to write Flink main for stageful function?
>
> At the moment, it is not possible to directly integrate Flink APIs and
Stateful Functions APIs in a single job.
What do you have in mind for what you want to achieve?

> 4. Is there an example anywhere on how to run such examples in the IDE
> with Kafka?
>
> The tests in `statefun-e2e-tests` can be run in the IDE and tests against
Kafka. It does require Docker to be available though.

> 5 There is a great stateful functions example
> https://github.com/ververica/flink-statefun-workshop, but its readme does
> not really describe implementation and neither does this article,
> referencing it
> https://dev.to/morsapaes/flink-stateful-functions-where-to-start-2j39. Is
> there anything that describes this implementation?
>
> I think the bottom half of the article provides some details of the
example, including the messaging between functions and a rough sketch of
the functions. Maybe its not detailed enough?
In particular, what parts of the example would you want to have more
details on?

Cheers,
Gordon


Re: kerberos integration with flink

2020-05-21 Thread Yangze Guo
Hi, Nick,

>From my understanding, if you configure the
"security.kerberos.login.keytab", Flink will add the
AppConfigurationEntry of this keytab to all the apps defined in
"security.kerberos.login.contexts". If you define
"java.security.auth.login.config" at the same time, Flink will also
keep the configuration in it. For more details, see [1][2].

If you want to use this keytab to interact with HDFS, HBase and Yarn,
you need to set "security.kerberos.login.contexts". See [3][4].

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/ops/security-kerberos.html#jaas-security-module
[2] 
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/JaasModule.java
[3] 
https://ci.apache.org/projects/flink/flink-docs-master/ops/security-kerberos.html#hadoop-security-module
[4] 
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java

Best,
Yangze Guo

On Thu, May 21, 2020 at 11:06 PM Nick Bendtner  wrote:
>
> Hi guys,
> Is there any difference in providing kerberos config to the flink jvm using 
> this method in the flink configuration?
>
> env.java.opts:  -Dconfig.resource=qa.conf 
> -Djava.library.path=/usr/mware/flink-1.7.2/simpleapi/lib/ 
> -Djava.security.auth.login.config=/usr/mware/flink-1.7.2/Jaas/kafka-jaas.conf 
> -Djava.security.krb5.conf=/usr/mware/flink-1.7.2/Jaas/krb5.conf
>
> Is there any difference in doing it this way vs providing it from 
> security.kerberos.login.keytab .
>
> Best,
>
> Nick.


Re: How do I get the IP of the master and slave files programmatically in Flink?

2020-05-21 Thread Yangze Guo
Hi, Felipe

I see your problem. IIUC, if you use AbstractUdfStreamOperator, you
could indeed get all the configurations(including what you defined in
flink-conf.yaml) through
"AbstractUdfStreamOperator#getRuntimeContext().getTaskManagerRuntimeInfo().getConfiguration()".
However, I guess it is not the right behavior and might be fixed in
future versions.

Best,
Yangze Guo



On Thu, May 21, 2020 at 3:13 PM Felipe Gutierrez
 wrote:
>
> Hi all,
>
> I would like to have the IP of the JobManager, not the Task Executors.
> I explain why.
>
> I have an operator (my own operator that extends
> AbstractUdfStreamOperator) that sends and receives messages from a
> global controller. So, regardless of which TaskManager these operator
> instances are deployed, they need to send and receive messages from my
> controller. Currently, I am doing this using MQTT broker (this is my
> first approach and I don't know if there is a better way to do it,
> maybe there is...)
>
> The first thing that I do is to start my controller using the
> org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl and subscribe
> it to the JobManager host. I am getting the IP of the JobManager by
> adding this method on the
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory
> class:
>public String getRpcServiceAddress() {
> return this.rpcService.getAddress();
> }
> That is working. Although I am not sure if it is the best approach.
>
> The second thing that I am doing is to make each operator instance
> publish and subscribe to this controller. To do this they need the
> JobManager IP. I could get the TaskManager IPs from the
> AbstractUdfStreamOperator, but not the JobManager IP. So, I am passing
> the JobManager IP as a parameter to the operator at the moment. I
> suppose that it is easy to get the JobManager IP inside the
> AbstractUdfStreamOperator or simply add some method somewhere to get
> this value. However, I don't know where.
>
> Thanks,
> Felipe
>
> --
> -- Felipe Gutierrez
> -- skype: felipe.o.gutierrez
> -- https://felipeogutierrez.blogspot.com
>
> On Thu, May 21, 2020 at 7:13 AM Yangze Guo  wrote:
> >
> > Hi, Felipe
> >
> > Do you mean to get the Host and Port of the task executor where your
> > operator is indeed running on?
> >
> > If that is the case, IIUC, two possible components that contain this
> > information are RuntimeContext and the Configuration param of
> > RichFunction#open. After reading the relevant code path, it seems you
> > could not get it at the moment.
> >
> > Best,
> > Yangze Guo
> >
> > Best,
> > Yangze Guo
> >
> >
> > On Wed, May 20, 2020 at 11:46 PM Alexander Fedulov
> >  wrote:
> > >
> > > Hi Felippe,
> > >
> > > could you clarify in some more details what you are trying to achieve?
> > >
> > > Best regards,
> > >
> > > --
> > >
> > > Alexander Fedulov | Solutions Architect
> > >
> > > +49 1514 6265796
> > >
> > >
> > >
> > > Follow us @VervericaData
> > >
> > > --
> > >
> > > Join Flink Forward - The Apache Flink Conference
> > >
> > > Stream Processing | Event Driven | Real Time
> > >
> > > --
> > >
> > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> > >
> > > --
> > >
> > > Ververica GmbH
> > > Registered at Amtsgericht Charlottenburg: HRB 158244 B
> > > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji 
> > > (Tony) Cheng
> > >
> > >
> > >
> > >
> > > On Wed, May 20, 2020 at 1:14 PM Felipe Gutierrez 
> > >  wrote:
> > >>
> > >> Hi all,
> > >>
> > >> I have my own operator that extends the AbstractUdfStreamOperator
> > >> class and I want to issue some messages to it. Sometimes the operator
> > >> instances are deployed on different TaskManagers and I would like to
> > >> set some attributes like the master and slave IPs on it.
> > >>
> > >> I am trying to use these values but they only return localhost, not
> > >> the IP configured at flink-conf.yaml file. (jobmanager.rpc.address:
> > >> 192.168.56.1).
> > >>
> > >> ConfigOption restAddressOption = ConfigOptions
> > >>.key("rest.address")
> > >>.stringType()
> > >>.noDefaultValue();
> > >> System.out.println("DefaultJobManagerRunnerFactory rest.address: " +
> > >> jobMasterConfiguration.getConfiguration().getValue(restAddressOption));
> > >> System.out.println("rpcService: " + rpcService.getAddress());
> > >>
> > >>
> > >> Thanks,
> > >> Felipe
> > >>
> > >> --
> > >> -- Felipe Gutierrez
> > >> -- skype: felipe.o.gutierrez
> > >> -- https://felipeogutierrez.blogspot.com


[no subject]

2020-05-21 Thread 王立杰


Re: Stateful functions Harness

2020-05-21 Thread Boris Lublinsky
Also, where do I put flint-conf.yaml in Idea to add additional required config 
parameter:
classloader.parent-first-patterns.additional: 
org.apache.flink.statefun;org.apache.kafka;com.google.protobuf


> On May 21, 2020, at 12:22 PM, Boris Lublinsky  
> wrote:
> 
> Hi, 
> I am trying to run 
> https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-greeter-example
>  
> 
>  locally
> using 
> https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-flink-harness-example
>  
> 
>  
> And have several questions.
> 1. It seems fairly straightforward to use it with in memory message 
> generators, but I can’t figure out how to add Kafka ingress/Egress so that I 
> can use it with Kafk
> 2. GreetingModule already creates StatefulFunctionUniverse  and so does 
> Harness. Is there a way to short circuit it and have Harness get 
> StatefulFunctionUniverse directly
> 3. Is there an example on how to write Flink main for stageful function?
> 4. Is there an example anywhere on how to run such examples in the IDE with 
> Kafka?
> 5 There is a great stateful functions example 
> https://github.com/ververica/flink-statefun-workshop 
> , but its readme does 
> not really describe implementation and neither does this article, referencing 
> it https://dev.to/morsapaes/flink-stateful-functions-where-to-start-2j39 
> . Is 
> there anything that describes this implementation?
> 



Stateful functions Harness

2020-05-21 Thread Boris Lublinsky
Hi, 
I am trying to run 
https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-greeter-example
 

 locally
using 
https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-flink-harness-example
 

 
And have several questions.
1. It seems fairly straightforward to use it with in memory message generators, 
but I can’t figure out how to add Kafka ingress/Egress so that I can use it 
with Kafk
2. GreetingModule already creates StatefulFunctionUniverse  and so does 
Harness. Is there a way to short circuit it and have Harness get 
StatefulFunctionUniverse directly
3. Is there an example on how to write Flink main for stageful function?
4. Is there an example anywhere on how to run such examples in the IDE with 
Kafka?
5 There is a great stateful functions example 
https://github.com/ververica/flink-statefun-workshop 
, but its readme does not 
really describe implementation and neither does this article, referencing it 
https://dev.to/morsapaes/flink-stateful-functions-where-to-start-2j39 
. Is 
there anything that describes this implementation?



Re: [EXTERNAL] Re: Memory growth from TimeWindows

2020-05-21 Thread Slotterback, Chris
For those who are interested or googling the mail archives in 8 months, the 
issue was garbage collection related.

The default 1.8 jvm garbage collector (parallel gc) was being lazy in its 
marking and collection phases and letting the heap build to a level that was 
causing memory exceptions and stalled tms. This app has a lot of state, and 
memory usage well above 10GB at times. The solution was moving to the G1 
collector which is very aggressive in its young generation collection by 
default, at the cost of some cpu usage and requires some tuning, but keeps the 
memory levels much more stable.

On 5/20/20, 9:05 AM, "Slotterback, Chris"  
wrote:

What I've noticed is that heap memory ends up growing linearly with time 
indefinitely (past 24 hours) until it hits the roof of the allocated heap for 
the task manager, which leads me to believe I am leaking somewhere. All of my 
windows have an allowed lateness of 5 minutes, and my watermarks are pulled 
from time embedded in the records using 
BoundedOutOfOrdernessTimestampExtractors. My TumblingEventTimeWindows and 
SlidingEventTimeWindow all use AggregateFunctions, and my intervalJoins use 
ProcessJoinFunctions.

I expect this app to use a significant amount of memory at scale due to the 
288 5-minute intervals in 24 hours, and records being put in all 288 window 
states, and as the application runs for 24 hours memory would increase as all 
288(*unique key) windows build with incoming records, but then after 24 hours 
the memory should stop growing, or at least grow at a different rate?

Also of note, we are using a FsStateBackend configuration, and plan to move 
to RocksDBStateBackend, but from what I can tell, this would only reduce memory 
and delay hitting the heap memory capacity, not stall it forever?

Thanks
Chris


On 5/18/20, 7:29 AM, "Aljoscha Krettek"  wrote:

On 15.05.20 15:17, Slotterback, Chris wrote:
> My understanding is that while all these windows build their memory 
state, I can expect heap memory to grow for the 24 hour length of the 
SlidingEventTimeWindow, and then start to flatten as the t-24hr window frames 
expire and release back to the JVM. What is actually happening is when a 
constant data source feeds the stream, the heap memory profile grows linearly 
past the 24 hour mark. Could this be a result of a misunderstanding of how the 
window’s memory states are kept, or is my assumption correct, and it is more 
likely I have a leak somewhere?

Will memory keep growing indefinitely? That would indicate a bug? What
sort of lateness/watermark settings do you have? What window function do
you use? ProcessWindowFunction, or sth that aggregates?

Side note: with sliding windows of 24h/5min you will have a "write
amplification" of 24*60/5=288, each record will be in 288 windows, which
will each be kept in separate state?

Best,
Aljoscha





kerberos integration with flink

2020-05-21 Thread Nick Bendtner
Hi guys,
Is there any difference in providing kerberos config to the flink jvm using
this method in the flink configuration?

env.java.opts:  -Dconfig.resource=qa.conf
-Djava.library.path=/usr/mware/flink-1.7.2/simpleapi/lib/
-Djava.security.auth.login.config=/usr/mware/flink-1.7.2/Jaas/kafka-jaas.conf
-Djava.security.krb5.conf=/usr/mware/flink-1.7.2/Jaas/krb5.conf

Is there any difference in doing it this way vs providing it from
security.kerberos.login.keytab .

Best,

Nick.


Re: Using Queryable State within 1 job + docs suggestion

2020-05-21 Thread Annemarie Burger
Hi,

So what I meant was that I have a keyed stream, and from each
thread/keygroup/PU I want to query the state of the other
threads/keygroups/PUs. 

Does anybody have any experience with this? 

I'm currently working on it, and the main problem seems to be that the
Queryable State Client requires the JobID from which to query the state,
which in my case would be the same as its own jobID. Any ideas how to
workaround this? 
Using env.getStreamGraph.getJobGraph.getJobID doesn't seem to work. 

Best,
Annemarie 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Stream Iterative Matching

2020-05-21 Thread Guowei Ma
Hi, Marc
1. I think you should choose which type of window you want to use first.
(Thumbling/Sliding/Session) From your description, I think the session
window maybe not suit your case because there is no gap.
2. >>> how this would work in practise or how to handle the case where
timers fire for data that has already been ejected from the window (as it
has been matched with past data)?
 Do you want to know the lifecycle of the element in the window? I
think you could know that the lifecycle of the window and element in it
after you choose your window type. For example, the element could be
assigned to multiple slide windows and an element ejected from a sliding
window could be processed from another sliding window.[1]
3. I think you could find some examples in the `WindowTranslationTest`.
4. If these window types do not work for your application. I think you
might need a customized window(trigger/evictor).  However, I think you
could make a simple POC with the current type window first.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/windows.html#sliding-windows
Best,
Guowei


ba  于2020年5月21日周四 下午4:00写道:

> Hi Guowei,
>
> Thank you for your reply. Are you able to give some detail on how that
> would
> work with the per window state you linked? I'm struggling to see how the
> logic would work.
>
> I guess something like a session window on a keyed stream (keyed by sensor
> ID). Timers would fire 90 seconds after each element is added to the window
> and then be evaluated? I can't quite think how this would work in practise
> or how to handle the case where timers fire for data that has already been
> ejected from the window (as it has been matched with past data)?
>
> If there are any examples showing similar uses of this function that would
> be great?
>
> Any assistance is very appreciated!
>
> Best,
> Marc
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Using Queryable State within 1 job + docs suggestion

2020-05-21 Thread Yun Tang
Hi Annemarie

Actually, I do not know what exactly PU means in your thread. If you means the 
task manager, though I haven't tried I think we might be able to query state in 
the same job. Maybe you could give a try.

In general, we would initialize two states in the same operator so that they 
could query each other, which provide better performance.

Best
Yun Tang

From: Annemarie Burger 
Sent: Thursday, May 21, 2020 19:45
To: user@flink.apache.org 
Subject: Re: Using Queryable State within 1 job + docs suggestion

Hi,

Thanks for your response!
What if I'm using regular state instead of windowState, is there any way to
use query this state of a PU from another PU in the same Flink job?

Best,
Annemarie



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink Window with multiple trigger condition

2020-05-21 Thread aj
Session window defined on the gap of inactivity, I do not have that
requirement.

Start the window only on the "*search even*t" that part I will take later.

Let's say in the first phase I want to start the window on any event that
appears for that user.

For example :

*Scenario -1*
t1 - user1   event1 ( window start)
t1 +5 mins - user1 - event2
t1 + 10 mins --- user1  event3
t1 + 15 mins - user1  event4===start type event (terminate window
as event type "*Start*" arrived and calculate aggregate on above collected
events)

t1+16 mins ---user-1   event 5 starts a new window


*Scenario -2*
t1 - user1   event1 ( window start)
t1 +5 mins - user1 - event2
t1 + 10 mins --- user1  event3
t1 + 30 mins - user1  event4 (terminates the window as 30 mins
elapsed and calculate aggregate on above collected events)

t1+31 mins ---user-1   event5  starts a new window

This I want to implement. I have tried to read triggers but did not getting
understand how to trigger when either time pass or eventtype==* "start"*
has arrived.  Which function of trigger class I have to implement and how
to check these 2  conditions on each event arrive.

Please help to implement this. If you can provide a basic start function
that I need to implement. I am not clear how to start.



















On Thu, May 21, 2020 at 4:59 PM Jiayi Liao  wrote:

>
> According to your description, it's more like a session window scenario
> rather than a tumbling window, you can find more details in [1]. But the
> difference between your window and Flink
> 's session window is, the session length is defined by the first element
> in your case. I'm afraid that Flink does't have implementations for such
> scenarios, you may need to create your own WindowAssigner.
>
> For the trigger, yes, you can always implement a trigger to determine the
> lifecyle of a window.
>
>
>
> [1].
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows
>
>
>
> Best,
> Jiayi Liao
>
> On Thu, May 21, 2020 at 5:51 PM aj  wrote:
>
>> Hi Liao,
>>
>> Thanks for the response. I am reading all this as I never implemented
>> this.
>>
>> > Start the window for the user when event_name: *"search"  *arrived for
>> the user.
>>
>> I'm not very sure this is right way to define the window in your business
>> if you use event time, because there may exist out-of-order events that
>> happened after "search" event, but arrive before "search" event, which will
>> be discarded because you haven't assigned a window. (If I understand
>> correctly)
>>
>> *Yes you are right and I raised this concern to the business team and we
>> still in discussion. *
>>
>> But let say if I do not need the above condition if I want to start the
>> window whenever the first event of the particular user event appears and
>> then bucket those events with similar conditions (either 30 mins from the
>> start of the window reached or event_type: *"start" *is appeared). So,
>> in that case, can I use *TumblingProcessingTimeWindows *with 30 mins,
>> and on that can I put a custom trigger that before 30 mins if event_type: 
>> *"start"
>> is *arrived than the process the window.
>> Is this correct understanding like if let stay *start* event arrived at
>> 20 mins from window start then that window will be close and processed and
>> events that arriving after that will be assign to the new window or window
>> will continue till 30 mins.
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> On Thu, May 21, 2020 at 2:55 PM Jiayi Liao 
>> wrote:
>>
>>> Hi,
>>>
>>>
>>> > Start the window for the user when event_name: *"search"  *arrived
>>> for the user.
>>>
>>> I'm not very sure this is right way to define the window in your
>>> business if you use event time, because there may exist out-of-order events
>>> that happened after "search" event, but arrive before "search" event, which
>>> will be discarded because you haven't assigned a window. (If I understand
>>> correctly)
>>>
>>> Back to the problem, I think you can implement your own #WindowAssigner,
>>> which will create a window based on the event's name. Take a look at our
>>> existing implementations like #TumblingEventWindows.
>>>
>>> > Trigger the window when either 30 mins from the start of
>>> the window reached or event_type : *"start" *is appeared
>>>
>>> This can also be implemented with a customed #Trigger. The timing of
>>> being triggered can be set by registering timers with Flink's internal
>>> timer service. Take a look at #EventTimeTrigger, it's easy to implement it.
>>>
>>>
>>> Best,
>>> Jiayi Liao
>>>
>>> On Thu, May 21, 2020 at 2:47 PM aj  wrote:
>>>

 Hello All,

 I am getting a lot of user events in a  stream. There are different
 types of events, now I want to build some aggregation metrics for the user
 by grouping events in buckets.

 My condition for windowing is :

 1. Start the window for the user when event_nam

Re: Flink on Kubernetes

2020-05-21 Thread Yang Wang
Hi lvan Yang,

#1. If a TaskManager crashed exceptionally and there are some running task
on it, it
could not join back gracefully. Whether the full job will be restarted
depends on the
failover strategies[1].

#2. Currently, when new TaskManagers join to the Flink cluster, the running
Flink
job could not rescale automatically. You need to stop with a savepoint and
restart
the job manually. The community is still working on this. And you could
find more
information in this ticket[2].

[1].
https://ci.apache.org/projects/flink/flink-docs-stable/dev/task_failure_recovery.html#failover-strategies
[2]. https://issues.apache.org/jira/browse/FLINK-10407

Best,
Yang

Ivan Yang  于2020年5月21日周四 下午3:01写道:

> Hi,
>
> I have setup Filnk 1.9.1 on Kubernetes on AWS EKS with one job manager
> pod, 10 task manager pods, one pod per EC2 instance. Job runs fine. After a
> while, for some reason, one pod (task manager) crashed, then the pod
> restarted. After that, the job got into a bad state. All the parallelisms
> are showing different color (orange, purple) on the console. I had to
> basically stop the entire job. My question is should a task manager restart
> affect the entire cluster/job? Or should it join back gracefully?
>
> Second question is regarding to auto scaling Flink cluster on kubernetes.
> If I add more nodes/pods (task manager containers) to the cluster, will a
> running Flink job redistribute load to the additional resources or I have
> to stop to a savepoint, and restart the job?
>
> Thanks and regards.
> Ivan


Re: Using Queryable State within 1 job + docs suggestion

2020-05-21 Thread Annemarie Burger
Hi,

Thanks for your response!
What if I'm using regular state instead of windowState, is there any way to
use query this state of a PU from another PU in the same Flink job? 

Best,
Annemarie



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Performance impact of many open windows at the same time

2020-05-21 Thread Joe Malt
Hi all,

I'm looking into what happens when messages are ingested with timestamps
far into the future (e.g. due to corruption or a wrong clock at the sender).

I'm aware of the effect on watermarking, but another thing I'm concerned
about is the performance impact of the extra windows this will create.

If a Flink operator has many (perhaps hundreds or thousands) of windows
open but not receiving any data (and never firing), will this degrade
performance?

Thanks,
Joe


Re: Stream Iterative Matching

2020-05-21 Thread ba
Hi Guowei,

Thank you for your reply. Are you able to give some detail on how that would
work with the per window state you linked? I'm struggling to see how the
logic would work.

I guess something like a session window on a keyed stream (keyed by sensor
ID). Timers would fire 90 seconds after each element is added to the window
and then be evaluated? I can't quite think how this would work in practise
or how to handle the case where timers fire for data that has already been
ejected from the window (as it has been matched with past data)?

If there are any examples showing similar uses of this function that would
be great?

Any assistance is very appreciated!

Best,
Marc





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: How do I get the IP of the master and slave files programmatically in Flink?

2020-05-21 Thread Felipe Gutierrez
Hi all,

I would like to have the IP of the JobManager, not the Task Executors.
I explain why.

I have an operator (my own operator that extends
AbstractUdfStreamOperator) that sends and receives messages from a
global controller. So, regardless of which TaskManager these operator
instances are deployed, they need to send and receive messages from my
controller. Currently, I am doing this using MQTT broker (this is my
first approach and I don't know if there is a better way to do it,
maybe there is...)

The first thing that I do is to start my controller using the
org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl and subscribe
it to the JobManager host. I am getting the IP of the JobManager by
adding this method on the
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory
class:
   public String getRpcServiceAddress() {
return this.rpcService.getAddress();
}
That is working. Although I am not sure if it is the best approach.

The second thing that I am doing is to make each operator instance
publish and subscribe to this controller. To do this they need the
JobManager IP. I could get the TaskManager IPs from the
AbstractUdfStreamOperator, but not the JobManager IP. So, I am passing
the JobManager IP as a parameter to the operator at the moment. I
suppose that it is easy to get the JobManager IP inside the
AbstractUdfStreamOperator or simply add some method somewhere to get
this value. However, I don't know where.

Thanks,
Felipe

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com

On Thu, May 21, 2020 at 7:13 AM Yangze Guo  wrote:
>
> Hi, Felipe
>
> Do you mean to get the Host and Port of the task executor where your
> operator is indeed running on?
>
> If that is the case, IIUC, two possible components that contain this
> information are RuntimeContext and the Configuration param of
> RichFunction#open. After reading the relevant code path, it seems you
> could not get it at the moment.
>
> Best,
> Yangze Guo
>
> Best,
> Yangze Guo
>
>
> On Wed, May 20, 2020 at 11:46 PM Alexander Fedulov
>  wrote:
> >
> > Hi Felippe,
> >
> > could you clarify in some more details what you are trying to achieve?
> >
> > Best regards,
> >
> > --
> >
> > Alexander Fedulov | Solutions Architect
> >
> > +49 1514 6265796
> >
> >
> >
> > Follow us @VervericaData
> >
> > --
> >
> > Join Flink Forward - The Apache Flink Conference
> >
> > Stream Processing | Event Driven | Real Time
> >
> > --
> >
> > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> >
> > --
> >
> > Ververica GmbH
> > Registered at Amtsgericht Charlottenburg: HRB 158244 B
> > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji 
> > (Tony) Cheng
> >
> >
> >
> >
> > On Wed, May 20, 2020 at 1:14 PM Felipe Gutierrez 
> >  wrote:
> >>
> >> Hi all,
> >>
> >> I have my own operator that extends the AbstractUdfStreamOperator
> >> class and I want to issue some messages to it. Sometimes the operator
> >> instances are deployed on different TaskManagers and I would like to
> >> set some attributes like the master and slave IPs on it.
> >>
> >> I am trying to use these values but they only return localhost, not
> >> the IP configured at flink-conf.yaml file. (jobmanager.rpc.address:
> >> 192.168.56.1).
> >>
> >> ConfigOption restAddressOption = ConfigOptions
> >>.key("rest.address")
> >>.stringType()
> >>.noDefaultValue();
> >> System.out.println("DefaultJobManagerRunnerFactory rest.address: " +
> >> jobMasterConfiguration.getConfiguration().getValue(restAddressOption));
> >> System.out.println("rpcService: " + rpcService.getAddress());
> >>
> >>
> >> Thanks,
> >> Felipe
> >>
> >> --
> >> -- Felipe Gutierrez
> >> -- skype: felipe.o.gutierrez
> >> -- https://felipeogutierrez.blogspot.com


Flink on Kubernetes

2020-05-21 Thread Ivan Yang
Hi,

I have setup Filnk 1.9.1 on Kubernetes on AWS EKS with one job manager pod, 10 
task manager pods, one pod per EC2 instance. Job runs fine. After a while, for 
some reason, one pod (task manager) crashed, then the pod restarted. After 
that, the job got into a bad state. All the parallelisms are showing different 
color (orange, purple) on the console. I had to basically stop the entire job. 
My question is should a task manager restart affect the entire cluster/job? Or 
should it join back gracefully?

Second question is regarding to auto scaling Flink cluster on kubernetes. If I 
add more nodes/pods (task manager containers) to the cluster, will a running 
Flink job redistribute load to the additional resources or I have to stop to a 
savepoint, and restart the job?

Thanks and regards.
Ivan