How to create User Notifications/Reminder ?
I need to create User Notification/Reminder when I don’t see a specific event (high volume) from that user for more than 2 weeks. Should I be using windowing or CEP or ProcessFunction? I am pretty new to Flink. Can anyone please advise me what is the best way to solve this? Thank you for your time.
Re: How to create User Notifications/Reminder ?
Hi Hequen, I was more interested in solving using CEP. I want to have a window of 2 weeks and in the Timeout Handler I want to create Notification/Reminder. Is this doable in Flink 1.4.2.? Thanks On Wed, Jul 11, 2018 at 6:14 PM, Hequn Cheng wrote: > Hi shyla, > > There is a same question[1] asked two days ago. Maybe it is helpful for > you. Let me know if you have any other concern. > Best, Hequn > > [1] http://apache-flink-user-mailing-list-archive.2336050. > n4.nabble.com/How-to-trigger-a-function-on-the-state- > periodically-td21311.html > > On Thu, Jul 12, 2018 at 4:50 AM, shyla deshpande > wrote: > >> I need to create User Notification/Reminder when I don’t see a specific >> event (high volume) from that user for more than 2 weeks. >> >> Should I be using windowing or CEP or ProcessFunction? >> >> I am pretty new to Flink. Can anyone please advise me what is the best >> way to solve this? >> >> Thank you for your time. >> > >
Re: How to create User Notifications/Reminder ?
Thanks a lot Dawid and Hequn. Dawid, the link you provided is very useful. Thanks shyla On Thu, Jul 12, 2018 at 5:59 AM, Dawid Wysakowicz < wysakowicz.da...@gmail.com> wrote: > Hi shyla, > > It should be doable with CEP. You can create pattern like: > Pattern.begin("start").next/followedBy("end").where(...).within(/* two > weeks*/) and subscribe for timed out events. You can check very similar > example here[1]. > > Best, > > Dawid > > [1] https://github.com/dataArtisans/flink-training- > exercises/blob/master/src/main/java/com/dataartisans/ > flinktraining/exercises/datastream_java/cep/LongRidesExercise.java > > On 12/07/18 14:26, Hequn Cheng wrote: > > Hi shyla, > > Considering window, I think it is not very convenient. Two weeks window is > used to process data in the recent 2 weeks while you want to process data > beyond 2 weeks. > I'm not familiar with CEP, but it sounds like a good idea. > > Best, Hequn > > > On Thu, Jul 12, 2018 at 10:56 AM, shyla deshpande < > deshpandesh...@gmail.com> wrote: > >> Hi Hequen, >> >> I was more interested in solving using CEP. >> I want to have a window of 2 weeks and in the Timeout Handler I want to >> create Notification/Reminder. >> Is this doable in Flink 1.4.2.? >> >> Thanks >> >> >> On Wed, Jul 11, 2018 at 6:14 PM, Hequn Cheng >> wrote: >> >>> Hi shyla, >>> >>> There is a same question[1] asked two days ago. Maybe it is helpful for >>> you. Let me know if you have any other concern. >>> Best, Hequn >>> >>> [1] http://apache-flink-user-mailing-list-archive.2336050.n4 >>> .nabble.com/How-to-trigger-a-function-on-the-state-periodica >>> lly-td21311.html >>> >>> On Thu, Jul 12, 2018 at 4:50 AM, shyla deshpande < >>> deshpandesh...@gmail.com> wrote: >>> >>>> I need to create User Notification/Reminder when I don’t see a specific >>>> event (high volume) from that user for more than 2 weeks. >>>> >>>> Should I be using windowing or CEP or ProcessFunction? >>>> >>>> I am pretty new to Flink. Can anyone please advise me what is the best >>>> way to solve this? >>>> >>>> Thank you for your time. >>>> >>> >>> >> > >
Event Time Session Window does not trigger..
Hi, I used PopularPlacesFromKafka from dataartisans.flinktraining.exercises as the basis. I made very minor changes and the session window is not triggered. If I use ProcessingTime instead of EventTime it works. Here is my code. Appreciate any help. Thanks object KafkaEventTimeWindow { private val LOCAL_ZOOKEEPER_HOST = "localhost:2181" private val LOCAL_KAFKA_BROKER = "localhost:9092" private val CON_GROUP = "KafkaEventTimeSessionWindow" private val MAX_EVENT_DELAY = 60 // events are out of order by max 60 seconds def main(args: Array[String]) { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val kafkaProps = new Properties kafkaProps.setProperty("zookeeper.connect", LOCAL_ZOOKEEPER_HOST) kafkaProps.setProperty("bootstrap.servers", LOCAL_KAFKA_BROKER) kafkaProps.setProperty("group.id", CON_GROUP) kafkaProps.setProperty("auto.offset.reset", "earliest") val consumer = new FlinkKafkaConsumer011[PositionEventProto]( "positionevent", new PositionEventProtoSchema, kafkaProps) consumer.assignTimestampsAndWatermarks(new PositionEventProtoTSAssigner) val posstream = env.addSource(consumer) def convtoepochmilli(cdt: String): Long = { val odt:OffsetDateTime = OffsetDateTime.parse(cdt); val i:Instant = odt.toInstant(); val millis:Long = i.toEpochMilli(); millis } val outputstream = posstream .mapWith{case(p) => (p.getConsumerUserId, convtoepochmilli(p.getCreateDateTime.getInIso8601Format))} .keyBy(0) .window(EventTimeSessionWindows.withGap(Time.seconds(60))) .reduce { (v1, v2) => (v1._1, Math.max(v1._2 , v2._2)) } outputstream.print() // execute the transformation pipeline env.execute("Output Stream") } } class PositionEventProtoTSAssigner extends BoundedOutOfOrdernessTimestampExtractor[PositionEventProto](Time.seconds(60)) { override def extractTimestamp(pos: PositionEventProto): Long = { val odt:OffsetDateTime = OffsetDateTime.parse(pos.getCreateDateTime.getInIso8601Format); val i:Instant = odt.toInstant(); val millis:Long = i.toEpochMilli(); millis } }
Re: Event Time Session Window does not trigger..
Hi Hequn, I now realize that in Production, data will not be a problem since this will be a high volume kafka topic. So, I will go with EventTime. Still, I would like to know if I can use both TimeCharacteristic.ProcessingTime and TimeCharacteristic.EventTime in an application. *Thanks, the link you provided saved my time.* *-shyla* On Sun, Aug 5, 2018 at 9:28 AM, anna stax wrote: > Hi Hequn, > > Thanks for link. Looks like I better use ProcessingTime instead of > EventTime especially because of the 4th reason you listed.. > "Data should cover a longer time span than the window size to advance the > event time." > I need the trigger when the data stops. > > I have 1 more question. > > Can I set the TimeCharacteristic to the stream level instead on the > application level? > Can I use both TimeCharacteristic.ProcessingTime and > TimeCharacteristic.EventTime in an application. > > Thank you > > On Sat, Aug 4, 2018 at 10:05 PM, Hequn Cheng wrote: > >> Hi shyla, >> >> I answered a similar question on stackoverflow[1], you can take a look >> first. >> >> Best, Hequn >> >> [1] https://stackoverflow.com/questions/51691269/event-time- >> window-in-flink-does-not-trigger >> >> On Sun, Aug 5, 2018 at 11:24 AM, shyla deshpande < >> deshpandesh...@gmail.com> wrote: >> >>> Hi, >>> >>> I used PopularPlacesFromKafka from dataartisans.flinktraining.exercises as >>> the basis. I made very minor changes >>> >>> and the session window is not triggered. If I use ProcessingTime instead of >>> EventTime it works. Here is my code. >>> >>> Appreciate any help. Thanks >>> >>> object KafkaEventTimeWindow { >>> >>> private val LOCAL_ZOOKEEPER_HOST = "localhost:2181" >>> private val LOCAL_KAFKA_BROKER = "localhost:9092" >>> private val CON_GROUP = "KafkaEventTimeSessionWindow" >>> private val MAX_EVENT_DELAY = 60 // events are out of order by max 60 >>> seconds >>> >>> def main(args: Array[String]) { >>> >>> val env = StreamExecutionEnvironment.getExecutionEnvironment >>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) >>> >>> val kafkaProps = new Properties >>> kafkaProps.setProperty("zookeeper.connect", LOCAL_ZOOKEEPER_HOST) >>> kafkaProps.setProperty("bootstrap.servers", LOCAL_KAFKA_BROKER) >>> kafkaProps.setProperty("group.id", CON_GROUP) >>> kafkaProps.setProperty("auto.offset.reset", "earliest") >>> >>> val consumer = new FlinkKafkaConsumer011[PositionEventProto]( >>> "positionevent", >>> new PositionEventProtoSchema, >>> kafkaProps) >>> consumer.assignTimestampsAndWatermarks(new PositionEventProtoTSAssigner) >>> >>> val posstream = env.addSource(consumer) >>> >>> def convtoepochmilli(cdt: String): Long = { >>> val odt:OffsetDateTime = OffsetDateTime.parse(cdt); >>> val i:Instant = odt.toInstant(); >>> val millis:Long = i.toEpochMilli(); >>> millis >>> } >>> >>> val outputstream = posstream >>> .mapWith{case(p) => (p.getConsumerUserId, >>> convtoepochmilli(p.getCreateDateTime.getInIso8601Format))} >>> .keyBy(0) >>> .window(EventTimeSessionWindows.withGap(Time.seconds(60))) >>> .reduce { (v1, v2) => (v1._1, Math.max(v1._2 , v2._2)) } >>> >>> outputstream.print() >>> >>> // execute the transformation pipeline >>> env.execute("Output Stream") >>> } >>> >>> } >>> >>> class PositionEventProtoTSAssigner >>> extends >>> BoundedOutOfOrdernessTimestampExtractor[PositionEventProto](Time.seconds(60)) >>> { >>> >>> override def extractTimestamp(pos: PositionEventProto): Long = { >>> val odt:OffsetDateTime = >>> OffsetDateTime.parse(pos.getCreateDateTime.getInIso8601Format); >>> val i:Instant = odt.toInstant(); >>> val millis:Long = i.toEpochMilli(); >>> millis >>> } >>> } >>> >>> >>> >> >
Re: Event Time Session Window does not trigger..
Hi Hequn and Fabian, Thanks. Appreciate your help On Mon, Aug 6, 2018 at 1:32 AM, Fabian Hueske wrote: > Hi, > > By setting the time characteristic to EventTime, you enable the internal > handling of record timestamps and watermarks. > In contrast to EventTime, ProcessingTime does not require any additional > data. > > You can use both, EventTime and ProcessingTime in the same application and > StreamExecutionEnvironment. > However, if you enable EventTime, this will be the default mode in some > API methods that create time-based operators and you will need to > explicitly create ProcessingTime operators if you want to work in > ProcessingTime. > For example, the stream.keyBy().timeWindow(Time.minute(1)) shortcut, > would create an EventTime Tumbling Window if the TimeCharacteristic is set > to EventTime and a ProcessingTIme Tumbling Window if it is ProcessingTIme. > > Best, > Fabian > > 2018-08-06 4:30 GMT+02:00 Hequn Cheng : > >> Hi anna, shyla >> >> When we call setStreamTimeCharacteristic(env.setStreamTimeCharacteristic), >> it means sets the time characteristic for all streams create from this >> environment. So if your application contains multi environments, then yes. >> >> Best, Hequn >> >> On Mon, Aug 6, 2018 at 9:37 AM, shyla deshpande > > wrote: >> >>> Hi Hequn, >>> >>> I now realize that in Production, data will not be a problem since this >>> will be a high volume kafka topic. >>> So, I will go with EventTime. >>> >>> Still, I would like to know if >>> >>> I can use both TimeCharacteristic.ProcessingTime and >>> TimeCharacteristic.EventTime in an application. >>> >>> *Thanks, the link you provided saved my time.* >>> >>> *-shyla* >>> >>> >>> >>> >>> >>> On Sun, Aug 5, 2018 at 9:28 AM, anna stax wrote: >>> >>>> Hi Hequn, >>>> >>>> Thanks for link. Looks like I better use ProcessingTime instead of >>>> EventTime especially because of the 4th reason you listed.. >>>> "Data should cover a longer time span than the window size to advance >>>> the event time." >>>> I need the trigger when the data stops. >>>> >>>> I have 1 more question. >>>> >>>> Can I set the TimeCharacteristic to the stream level instead on the >>>> application level? >>>> Can I use both TimeCharacteristic.ProcessingTime and >>>> TimeCharacteristic.EventTime in an application. >>>> >>>> Thank you >>>> >>>> On Sat, Aug 4, 2018 at 10:05 PM, Hequn Cheng >>>> wrote: >>>> >>>>> Hi shyla, >>>>> >>>>> I answered a similar question on stackoverflow[1], you can take a look >>>>> first. >>>>> >>>>> Best, Hequn >>>>> >>>>> [1] https://stackoverflow.com/questions/51691269/event-time- >>>>> window-in-flink-does-not-trigger >>>>> >>>>> On Sun, Aug 5, 2018 at 11:24 AM, shyla deshpande < >>>>> deshpandesh...@gmail.com> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> I used PopularPlacesFromKafka from dataartisans.flinktraining.exercises >>>>>> as the basis. I made very minor changes >>>>>> >>>>>> and the session window is not triggered. If I use ProcessingTime instead >>>>>> of EventTime it works. Here is my code. >>>>>> >>>>>> Appreciate any help. Thanks >>>>>> >>>>>> object KafkaEventTimeWindow { >>>>>> >>>>>> private val LOCAL_ZOOKEEPER_HOST = "localhost:2181" >>>>>> private val LOCAL_KAFKA_BROKER = "localhost:9092" >>>>>> private val CON_GROUP = "KafkaEventTimeSessionWindow" >>>>>> private val MAX_EVENT_DELAY = 60 // events are out of order by max 60 >>>>>> seconds >>>>>> >>>>>> def main(args: Array[String]) { >>>>>> >>>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment >>>>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) >>>>>> >>>>>> val kafkaProps = new Properties >>>>>> kafkaProps.setProperty("zookeeper.connect", LOCAL_ZOOKE
docker, error NoResourceAvailableException..
Hello all, Trying to use docker as a single node flink cluster. docker run --name flink_local -p 8081:8081 -t flink local I submited a job to the cluster using the Web UI. The job failed. I see this error message in the docker logs. org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not allocate all requires slots within timeout of 30 ms. Slots required: 2, slots allocated: 0 The Web UI, shows 0 taskmanagers and 0 task slots on the Flink dashboard. How do I start the docker with 2 Task slots? Appreciate any help. Thanks
Re: docker, error NoResourceAvailableException..
Thanks Dominik, I will try that. On Wed, Aug 15, 2018 at 3:10 AM, Dominik Wosiński wrote: > Hey, > The problem is that your command does start Job Manager container, but it > does not start the Task Manager . That is why you have 0 slots. Currently, > the default *numberOfTaskSlots* is set to the number of CPUs avaialbe on > the machine. > > > So, You generally can to do 2 things: > > > 1) Start Job Manager and 2 Task Managers. If you have Docker Compose > available, you can paste this to your *docker-compose.yml* : > > > > *services: jobmanager:image: *${FLINK_DOCKER_IMAGE_NAME:-flink} > > *expose: *- "6123" > > *ports: *- "8081:8081" > *command: *jobmanager > > *environment: *- JOB_MANAGER_RPC_ADDRESS=jobmanager > > > *taskmanager:image: *${FLINK_DOCKER_IMAGE_NAME:-flink} > > *expose: *- "6121" > - "6122" > > *depends_on: *- jobmanager > *command: *taskmanager > > *links: *- "jobmanager:jobmanager" > > *environment: *- JOB_MANAGER_RPC_ADDRESS=jobmanager > > > *taskmanager1:image: *${FLINK_DOCKER_IMAGE_NAME:-flink} > > *expose: *- "6190" > - "6120" > > *depends_on: *- jobmanager > *command: *taskmanager > > *links: *- "jobmanager:jobmanager" > > *environment: *- JOB_MANAGER_RPC_ADDRESS=jobmanager > > > > This will give you 1 Job Manager and 2 Task Managers with one task slot > each, so 2 Task slots in general. > > 2) You can deploy 1 Job Manager and 1 Task Manager.Then you need to modify > *flink-conf.yml* and set the following setting : > > *taskmanager.numberOfTaskSlots: *2 > > > This will give you 2 Task Slots with only 1 Task Manager. But you will > need to somehow override config in the container, possibly using : > https://docs.docker.com/storage/volumes/ > > Regards, > Dominik. > > *Od: *shyla deshpande > *Wysłano: *środa, 15 sierpnia 2018 07:23 > *Do: *user > *Temat: *docker, error NoResourceAvailableException.. > > > > Hello all, > > > > Trying to use docker as a single node flink cluster. > > > > docker run --name flink_local -p 8081:8081 -t flink local > > > > I submited a job to the cluster using the Web UI. The job failed. I see > this error message in the docker logs. > > > > org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: > Could not allocate all requires slots within timeout of 30 ms. Slots > required: 2, slots allocated: 0 > > > > The Web UI, shows 0 taskmanagers and 0 task slots on the Flink dashboard. > > How do I start the docker with 2 Task slots? > > > > Appreciate any help. > > > > Thanks > > >
When using Flink for CEP, can the data in Cassandra database be used for state
Hello all, I am new to Flink. We have our data in Cassandra database. We have a use case for CEP. I am checking out if Flink fits well for us. When processing the event data, I may want to pull data for the cassandra database like the user profile and join with the event data. Is there a way to do that? I appreciate your help. Thanks