Hi Averell

I think this is because after 'union', the input stream actually did not follow 
the rule that key must be pre-partitioned in EXACTLY the same way Flinkā€™s keyBy 
would partition the data [1]. An easy way to verify this is refer to [2] to 
filter whether different sub-task of union stream contains exactly what down 
stream task conatains.

Best
Yun Tang


[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/experimental.html#reinterpreting-a-pre-partitioned-data-stream-as-keyed-stream
[2] 
https://github.com/apache/flink/blob/4e505c67542a45c82c763c12099bdfc621c9e476/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java#L223

________________________________
From: Averell <lvhu...@gmail.com>
Sent: Sunday, May 5, 2019 16:43
To: user@flink.apache.org
Subject: IllegalArgumentException with CEP & reinterpretAsKeyedStream

Hi everyone,

I have a big stream A, filtered by flags from a small stream B, then unioned
with another stream C to become the input for my CEP.
As the three streams A, B, C are all keyed, I expected that the output
stream resulting from connecting/unioning them would also be keyed, thus I
used /reinterpretAsKeyedStream/ before putting it into CEP. And with this, I
got the error /IllegalArgumentException/ (full stack-trace below).
If I use parallelism of 1, or if I don't use reinterpretAsKeyedStream (and
use /keyBy/ manually), then there's no such exception.

I don't know how to debug this error, and not sure whether I should use
keyed streams with CEP?

Thanks and best regards,
Averell


My code:
/       val cepInput = streamA.keyBy(r => (r.id1, r.id2))
                        .connect(streamB.keyBy(r => (r.id1, r.id2)))
                        .flatMap(new MyCandidateFilterFunction())
                        .union(streamC.keyBy(r => (r.id1, r.id2)))

        val cepOutput =
                MyCEP(new DataStreamUtils(cepInput).reinterpretAsKeyedStream(r 
=> (r.id1,
r.id2)),
                        counter1, counter2,
                        threshold1, threshold2)

        object MyCEP {
                def apply(input: KeyedStream[Event, _],
                                  longPeriod: Int,
                                  threshold: Int,
                                  shortPeriod: Int): DataStream[Event] = {

                        val patternLineIsUp = Pattern.begin[Event]("period1")
                                        .where((value: event, ctx: 
CepContext[Event]) => accSum(_.counter,
Seq("period1"), value, ctx) < threshold)
                                        .times(longPeriod - 
shortPeriod).consecutive()
                          .next("period2")
                                        .where((value: Event, ctx: 
CepContext[Event]) =>
                                                accSum(_.counter, 
Seq("period1", "period2"), value, ctx) < threshold
&& value.status == "up")
                                        .times(shortPeriod).consecutive()

                        collectPattern(input, patternLineIsUp)
                }

                private def accSum(f: Event => Long, keys: Seq[String], 
currentEvent:
Event, ctx: CepContext[Event]): Long = {
                        keys.map(key => 
ctx.getEventsForPattern(key).map(f).sum).sum +
f(currentEvent)
                }

                private def collectPattern(inputStream: KeyedStream[Event, _], 
pattern:
Pattern[Event, Event]): DataStream[Event] =
                        CEP.pattern(inputStream, pattern)
                                        .process((map: util.Map[String, 
util.List[Event]], ctx:
PatternProcessFunction.Context, collector: Collector[Event]) => {
                                                val records = map.get("period2")
                                                
collector.collect(records.get(records.size() - 1))
                                        })
        }/

The exception:
/Exception in thread "main" 12:43:13,103 INFO
org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Stopped Akka
RPC service.
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
        at
org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638)
        at
org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
        at
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
        at com.mycompany.StreamingJob$.main(Streaming.scala:440)
        at com.mycompany.StreamingJob.main(Streaming.scala)
Caused by: java.lang.IllegalArgumentException
        at
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
        at
org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.globalKeyGroupToLocalIndex(HeapPriorityQueueSet.java:158)
        at
org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForKeyGroup(HeapPriorityQueueSet.java:147)
        at
org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForElement(HeapPriorityQueueSet.java:154)
        at
org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:121)
        at
org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:49)
        at
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerEventTimeTimer(InternalTimerServiceImpl.java:215)
        at
org.apache.flink.cep.operator.CepOperator.saveRegisterWatermarkTimer(CepOperator.java:285)
        at
org.apache.flink.cep.operator.CepOperator.processElement(CepOperator.java:266)
        at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
        at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
        at java.lang.Thread.run(Thread.java:748)
/



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to