[jira] [Commented] (FLINK-11774) IllegalArgumentException in HeapPriorityQueueSet

2020-03-12 Thread Nico Kruber (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17058254#comment-17058254
 ] 

Nico Kruber commented on FLINK-11774:
-

Ok, after looking at the OP's code and the other occasions, I think, I'll split 
off my problem from this issue since mine is much more low-level.

 

[~fwollsch]: Since Flink has to go through the whole type hierarchy anyway for 
gathering type information and (eventually) creating the serializer, I actually 
don't expect too much overhead. Especially, since this only has to be done 
once. But maybe that's something to continue in FLINK-16555

> IllegalArgumentException in HeapPriorityQueueSet
> 
>
> Key: FLINK-11774
> URL: https://issues.apache.org/jira/browse/FLINK-11774
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.7.2, 1.9.2, 1.10.0
> Environment: Can reproduce on the following configurations:
>  
> OS: macOS 10.14.3
> Java: 1.8.0_202
>  
> OS: CentOS 7.2.1511
> Java: 1.8.0_102
>Reporter: Kirill Vainer
>Priority: Blocker
> Fix For: 1.9.3, 1.10.1, 1.11.0
>
> Attachments: flink-bug-dist.zip, flink-bug-src.zip
>
>
> Hi,
> I encountered the following exception:
> {code}
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
> at 
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:647)
> at 
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1510)
> at flink.bug.App.main(App.java:21)
> Caused by: java.lang.IllegalArgumentException
> at 
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.globalKeyGroupToLocalIndex(HeapPriorityQueueSet.java:158)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForKeyGroup(HeapPriorityQueueSet.java:147)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForElement(HeapPriorityQueueSet.java:154)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:121)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:49)
> at 
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerProcessingTimeTimer(InternalTimerServiceImpl.java:197)
> at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.registerProcessingTimeTimer(WindowOperator.java:876)
> at 
> org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger.onElement(ProcessingTimeTrigger.java:36)
> at 
> org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger.onElement(ProcessingTimeTrigger.java:28)
> at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.onElement(WindowOperator.java:895)
> at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:396)
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
> at java.lang.Thread.run(Thread.java:748)
> {code}
> Code that reproduces the problem:
> {code:java}
> package flink.bug;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.sink.SinkFunction;
> import org.apache.flink.streaming.api.windowing.time.Time;
> public class App {
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(2);
> env.fromElements(1, 2)
> .map(Aggregate::new)
> .keyBy(Aggregate::getKey)
> .timeWindow(Time.seconds(2))
> .reduce(Aggregate::reduce)
> .addSink(new CollectSink());
> env.execute();
> }
> private static class Aggregate {
> private Key key = new Key();
> public Aggregate(long number) {
> }
> public static Aggr

[jira] [Commented] (FLINK-11774) IllegalArgumentException in HeapPriorityQueueSet

2020-03-12 Thread Jira


[ 
https://issues.apache.org/jira/browse/FLINK-11774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17057992#comment-17057992
 ] 

Felix Wollschläger commented on FLINK-11774:


[~NicoK]
{quote}
I would propose to create a separate (improvement) ticket for allowing enums 
(with the simple workaround of using their ordinal in the key extractor) and 
stick to the second issue here?
{quote}

I think the "problem" is not that Enums are directly used as keys, but are used 
as a part of a key (e.g. ```Tuple2```). On a low-level you 
could check if the key itself is an enum and use the ordinal in that case. If 
you want to handle Enums as a part of the key you'd have to search for every 
enum member in the class hiearchy, which may be too costly on such a low level.

> IllegalArgumentException in HeapPriorityQueueSet
> 
>
> Key: FLINK-11774
> URL: https://issues.apache.org/jira/browse/FLINK-11774
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.7.2, 1.9.2, 1.10.0
> Environment: Can reproduce on the following configurations:
>  
> OS: macOS 10.14.3
> Java: 1.8.0_202
>  
> OS: CentOS 7.2.1511
> Java: 1.8.0_102
>Reporter: Kirill Vainer
>Priority: Blocker
> Fix For: 1.9.3, 1.10.1, 1.11.0
>
> Attachments: flink-bug-dist.zip, flink-bug-src.zip
>
>
> Hi,
> I encountered the following exception:
> {code}
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
> at 
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:647)
> at 
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1510)
> at flink.bug.App.main(App.java:21)
> Caused by: java.lang.IllegalArgumentException
> at 
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.globalKeyGroupToLocalIndex(HeapPriorityQueueSet.java:158)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForKeyGroup(HeapPriorityQueueSet.java:147)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForElement(HeapPriorityQueueSet.java:154)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:121)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:49)
> at 
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerProcessingTimeTimer(InternalTimerServiceImpl.java:197)
> at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.registerProcessingTimeTimer(WindowOperator.java:876)
> at 
> org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger.onElement(ProcessingTimeTrigger.java:36)
> at 
> org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger.onElement(ProcessingTimeTrigger.java:28)
> at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.onElement(WindowOperator.java:895)
> at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:396)
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
> at java.lang.Thread.run(Thread.java:748)
> {code}
> Code that reproduces the problem:
> {code:java}
> package flink.bug;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.sink.SinkFunction;
> import org.apache.flink.streaming.api.windowing.time.Time;
> public class App {
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(2);
> env.fromElements(1, 2)
> .map(Aggregate::new)
> .keyBy(Aggregate::getKey)
> .timeWindow(Time.seconds(2))
> .reduce(Aggregate::reduce)
> .addSink(new CollectSink());
> env.execute();
> }
> 

[jira] [Commented] (FLINK-11774) IllegalArgumentException in HeapPriorityQueueSet

2020-03-12 Thread Nico Kruber (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17057809#comment-17057809
 ] 

Nico Kruber commented on FLINK-11774:
-

I narrowed it down to the state access inside {{DeltaTrigger}}: Removing the 
cleanup in 
{{org.apache.flink.streaming.api.windowing.triggers.DeltaTrigger#clear()}} did 
not change anything but removing state access in its {{onElement}} (pictured 
below) seems to resolve the bug:
{code:java}
@Override
public TriggerResult onElement(T element, long timestamp, W window, 
TriggerContext ctx) throws Exception {
   ValueState lastElementState = ctx.getPartitionedState(stateDesc);
   if (lastElementState.value() == null) {
  lastElementState.update(element);
  return TriggerResult.CONTINUE;
   }
   if (deltaFunction.getDelta(lastElementState.value(), element) > 
this.threshold) {
  lastElementState.update(element);
  return TriggerResult.FIRE;
   }
   return TriggerResult.CONTINUE;
} {code}

> IllegalArgumentException in HeapPriorityQueueSet
> 
>
> Key: FLINK-11774
> URL: https://issues.apache.org/jira/browse/FLINK-11774
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.7.2, 1.9.2, 1.10.0
> Environment: Can reproduce on the following configurations:
>  
> OS: macOS 10.14.3
> Java: 1.8.0_202
>  
> OS: CentOS 7.2.1511
> Java: 1.8.0_102
>Reporter: Kirill Vainer
>Priority: Blocker
> Fix For: 1.9.3, 1.10.1, 1.11.0
>
> Attachments: flink-bug-dist.zip, flink-bug-src.zip
>
>
> Hi,
> I encountered the following exception:
> {code}
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
> at 
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:647)
> at 
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1510)
> at flink.bug.App.main(App.java:21)
> Caused by: java.lang.IllegalArgumentException
> at 
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.globalKeyGroupToLocalIndex(HeapPriorityQueueSet.java:158)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForKeyGroup(HeapPriorityQueueSet.java:147)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForElement(HeapPriorityQueueSet.java:154)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:121)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:49)
> at 
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerProcessingTimeTimer(InternalTimerServiceImpl.java:197)
> at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.registerProcessingTimeTimer(WindowOperator.java:876)
> at 
> org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger.onElement(ProcessingTimeTrigger.java:36)
> at 
> org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger.onElement(ProcessingTimeTrigger.java:28)
> at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.onElement(WindowOperator.java:895)
> at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:396)
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
> at java.lang.Thread.run(Thread.java:748)
> {code}
> Code that reproduces the problem:
> {code:java}
> package flink.bug;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.sink.SinkFunction;
> import org.apache.flink.streaming.api.windowing.time.Time;
> public class App {
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(2);
> env.fromElements(1, 2)
> .map(Aggregate::n

[jira] [Commented] (FLINK-11774) IllegalArgumentException in HeapPriorityQueueSet

2020-03-12 Thread Nico Kruber (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17057748#comment-17057748
 ] 

Nico Kruber commented on FLINK-11774:
-

There are actually two bugs here:
 * enums not being viable sources for keys - 
[FLINK-16555|https://issues.apache.org/jira/browse/FLINK-16555] proposes to add 
warnings for this and other cases
 * something in the heap state back-end that causes wrong keygroup mappings 
with the exceptions I posted

I would propose to create a separate (improvement) ticket for allowing enums 
(with the simple workaround of using their ordinal in the key extractor) and 
stick to the second issue here?

 

As for that, I actually have it reproducible in a test that I quickly hacked 
together in 
[https://github.com/NicoK/flink/blob/state.corruption.debug/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/TopSpeedWindowingSavepointRestoreITCase.java].
 You can actually find all sorts of different exceptions associated with it, 
the two I mentioned but also this one:
{code:java}
10:23:12,046 WARN  
org.apache.flink.streaming.api.operators.BackendRestorerProcedure  - Exception 
while restoring keyed state backend for 
EvictingWindowOperator_90bea66de1c231edf33913ecd54406c1_(3/4) from alternative 
(1/1), will retry while more alternatives are available.
org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to 
restore heap backend
at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116)
at 
org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeyedStateBackend(FsStateBackend.java:529)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Corrupt stream, found tag: 8
at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:217)
at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
at 
org.apache.flink.api.common.typeutils.base.ListSerializer.deserialize(ListSerializer.java:133)
at 
org.apache.flink.api.common.typeutils.base.ListSerializer.deserialize(ListSerializer.java:42)
at 
org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders.lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:77)
at 
org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:297)
at 
org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:295)
at 
org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:256)
at 
org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:155)
at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114)
... 15 more{code}

> IllegalArgumentException in HeapPriorityQueueSet
> 
>
> Key: FLINK-11774
> URL: https://issues.apache.org/jira/browse/FLINK-11774
> Project: Flink
>  Issue Type: Bug
> 

[jira] [Commented] (FLINK-11774) IllegalArgumentException in HeapPriorityQueueSet

2020-03-11 Thread Jira


[ 
https://issues.apache.org/jira/browse/FLINK-11774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17057506#comment-17057506
 ] 

Felix Wollschläger commented on FLINK-11774:


Maybe updating the `KeySelector`-Interface could help us using Enum-Types 
inside Tuples as keys by adding a `buildHashCode`-Method:

{code}
package example;

import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.functions.Function;

import java.io.Serializable;

@FunctionalInterface
@Public
public interface KeySelector extends Function, Serializable {

KEY getKey(IN element) throws Exception;
default int buildHashCode(KEY key) throws Exception {
return key.hashCode();
}
}

For example:

{code}
import org.apache.flink.annotation.Public;

@FunctionalInterface
@Public
public interface HashStrategy {

default void initialize() throws Exception {}
int buildHashCode(T key) throws Exception;
}
{code}

{code}
package example;

public enum EnumHashStrategy implements HashStrategy> {

ORDINAL {
@Override
public int buildHashCode(Enum value) {
return 31 * value.ordinal();
}
},
NAME {
@Override
public int buildHashCode(Enum value) {
return value.name().hashCode();
}
}
}
{code}

{code}
package example;

import org.apache.flink.api.java.tuple.Tuple2;

import java.time.DayOfWeek;

public class MyKeySelector implements KeySelector, String>, Tuple2> {

@Override
public Tuple2 getKey(Tuple2, 
String> element) throws Exception {
return element.f0;
}

@Override
public int buildHashCode(Tuple2 key) throws Exception {
int result = EnumHashStrategy.ORDINAL.buildHashCode(key.f0);
return result * 31 + key.f1.hashCode();
}
}

{code}

> IllegalArgumentException in HeapPriorityQueueSet
> 
>
> Key: FLINK-11774
> URL: https://issues.apache.org/jira/browse/FLINK-11774
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.7.2
> Environment: Can reproduce on the following configurations:
>  
> OS: macOS 10.14.3
> Java: 1.8.0_202
>  
> OS: CentOS 7.2.1511
> Java: 1.8.0_102
>Reporter: Kirill Vainer
>Priority: Blocker
> Fix For: 1.9.3, 1.10.1, 1.11.0
>
> Attachments: flink-bug-dist.zip, flink-bug-src.zip
>
>
> Hi,
> I encountered the following exception:
> {code}
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
> at 
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:647)
> at 
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1510)
> at flink.bug.App.main(App.java:21)
> Caused by: java.lang.IllegalArgumentException
> at 
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.globalKeyGroupToLocalIndex(HeapPriorityQueueSet.java:158)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForKeyGroup(HeapPriorityQueueSet.java:147)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForElement(HeapPriorityQueueSet.java:154)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:121)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:49)
> at 
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerProcessingTimeTimer(InternalTimerServiceImpl.java:197)
> at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.registerProcessingTimeTimer(WindowOperator.java:876)
> at 
> org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger.onElement(ProcessingTimeTrigger.java:36)
> at 
> org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger.onElement(ProcessingTimeTrigger.java:28)
> at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.onElement(WindowOperator.java:895)
> at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:396)
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamT

[jira] [Commented] (FLINK-11774) IllegalArgumentException in HeapPriorityQueueSet

2020-03-11 Thread Stephan Ewen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17057380#comment-17057380
 ] 

Stephan Ewen commented on FLINK-11774:
--

I created this as a helper: https://issues.apache.org/jira/browse/FLINK-16555

> IllegalArgumentException in HeapPriorityQueueSet
> 
>
> Key: FLINK-11774
> URL: https://issues.apache.org/jira/browse/FLINK-11774
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.7.2
> Environment: Can reproduce on the following configurations:
>  
> OS: macOS 10.14.3
> Java: 1.8.0_202
>  
> OS: CentOS 7.2.1511
> Java: 1.8.0_102
>Reporter: Kirill Vainer
>Priority: Blocker
> Fix For: 1.9.3, 1.10.1, 1.11.0
>
> Attachments: flink-bug-dist.zip, flink-bug-src.zip
>
>
> Hi,
> I encountered the following exception:
> {code}
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
> at 
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:647)
> at 
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1510)
> at flink.bug.App.main(App.java:21)
> Caused by: java.lang.IllegalArgumentException
> at 
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.globalKeyGroupToLocalIndex(HeapPriorityQueueSet.java:158)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForKeyGroup(HeapPriorityQueueSet.java:147)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForElement(HeapPriorityQueueSet.java:154)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:121)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:49)
> at 
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerProcessingTimeTimer(InternalTimerServiceImpl.java:197)
> at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.registerProcessingTimeTimer(WindowOperator.java:876)
> at 
> org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger.onElement(ProcessingTimeTrigger.java:36)
> at 
> org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger.onElement(ProcessingTimeTrigger.java:28)
> at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.onElement(WindowOperator.java:895)
> at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:396)
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
> at java.lang.Thread.run(Thread.java:748)
> {code}
> Code that reproduces the problem:
> {code:java}
> package flink.bug;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.sink.SinkFunction;
> import org.apache.flink.streaming.api.windowing.time.Time;
> public class App {
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(2);
> env.fromElements(1, 2)
> .map(Aggregate::new)
> .keyBy(Aggregate::getKey)
> .timeWindow(Time.seconds(2))
> .reduce(Aggregate::reduce)
> .addSink(new CollectSink());
> env.execute();
> }
> private static class Aggregate {
> private Key key = new Key();
> public Aggregate(long number) {
> }
> public static Aggregate reduce(Aggregate a, Aggregate b) {
> return new Aggregate(0);
> }
> public Key getKey() {
> return key;
> }
> }
> public static class Key {
> }
> private static class CollectSink implements SinkFunction {
> private static final long serialVersionUID = 1;
> @SuppressWarnings("rawtypes")
> 

[jira] [Commented] (FLINK-11774) IllegalArgumentException in HeapPriorityQueueSet

2020-03-11 Thread Nico Kruber (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17057377#comment-17057377
 ] 

Nico Kruber commented on FLINK-11774:
-

I actually occasionally also see this with the {{TopSpeedWindowing}} example in 
Flink. I managed to get hold of a savepoint that was taken with parallelism 2 
(p=2) and shows a similar failure in two different ways (all running in Flink 
1.10.0; but I also see it in Flink 1.9):
* first of all, if I try to restore with p=2, everything is fine
* if I restore with p=4 I get an exception like the one mentioned above:
{code}
2020-03-11 15:53:35,149 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- 
Window(GlobalWindows(), DeltaTrigger, TimeEvictor, ComparableAggregator, 
PassThroughWindowFunction) -> Sink: Print to Std. Out (3/4) 
(2ecdb03905cc8a376d43b086925452a6) switched from RUNNING to FAILED.
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state 
backend for EvictingWindowOperator_90bea66de1c231edf33913ecd54406c1_(3/4) from 
any of the 1 provided restore options.
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when 
trying to restore heap backend
at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116)
at 
org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeyedStateBackend(FsStateBackend.java:529)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
... 11 more
Caused by: java.lang.IllegalArgumentException: KeyGroupRange{startKeyGroup=64, 
endKeyGroup=95} does not contain key group 97
at 
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:161)
at 
org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.globalKeyGroupToLocalIndex(HeapPriorityQueueSet.java:158)
at 
org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForKeyGroup(HeapPriorityQueueSet.java:147)
at 
org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForElement(HeapPriorityQueueSet.java:154)
at 
org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:121)
at 
org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper.lambda$keyGroupReader$0(HeapPriorityQueueSnapshotRestoreWrapper.java:85)
at 
org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:298)
at 
org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:293)
at 
org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:254)
at 
org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:153)
at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:11

[jira] [Commented] (FLINK-11774) IllegalArgumentException in HeapPriorityQueueSet

2020-03-11 Thread Stephan Ewen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17057376#comment-17057376
 ] 

Stephan Ewen commented on FLINK-11774:
--

A typical reason for this is when the hash code of a key is not stable across 
JVMs.
It looks like the hashCode of Enum is just the object reference's hash code, 
which is definitely not stable across JVMs. So any enum as a key will lead to 
incorrect key routing, and thus incorrect aggregates.

> IllegalArgumentException in HeapPriorityQueueSet
> 
>
> Key: FLINK-11774
> URL: https://issues.apache.org/jira/browse/FLINK-11774
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.7.2
> Environment: Can reproduce on the following configurations:
>  
> OS: macOS 10.14.3
> Java: 1.8.0_202
>  
> OS: CentOS 7.2.1511
> Java: 1.8.0_102
>Reporter: Kirill Vainer
>Priority: Blocker
> Fix For: 1.9.3, 1.10.1, 1.11.0
>
> Attachments: flink-bug-dist.zip, flink-bug-src.zip
>
>
> Hi,
> I encountered the following exception:
> {code}
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
> at 
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:647)
> at 
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1510)
> at flink.bug.App.main(App.java:21)
> Caused by: java.lang.IllegalArgumentException
> at 
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.globalKeyGroupToLocalIndex(HeapPriorityQueueSet.java:158)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForKeyGroup(HeapPriorityQueueSet.java:147)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForElement(HeapPriorityQueueSet.java:154)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:121)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:49)
> at 
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerProcessingTimeTimer(InternalTimerServiceImpl.java:197)
> at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.registerProcessingTimeTimer(WindowOperator.java:876)
> at 
> org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger.onElement(ProcessingTimeTrigger.java:36)
> at 
> org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger.onElement(ProcessingTimeTrigger.java:28)
> at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.onElement(WindowOperator.java:895)
> at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:396)
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
> at java.lang.Thread.run(Thread.java:748)
> {code}
> Code that reproduces the problem:
> {code:java}
> package flink.bug;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.sink.SinkFunction;
> import org.apache.flink.streaming.api.windowing.time.Time;
> public class App {
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(2);
> env.fromElements(1, 2)
> .map(Aggregate::new)
> .keyBy(Aggregate::getKey)
> .timeWindow(Time.seconds(2))
> .reduce(Aggregate::reduce)
> .addSink(new CollectSink());
> env.execute();
> }
> private static class Aggregate {
> private Key key = new Key();
> public Aggregate(long number) {
> }
> public static Aggregate reduce(Aggregate a, Aggregate b) {
> return new Aggregate(0);
> }
> public Key getKey() {
> return key;
> }
>  

[jira] [Commented] (FLINK-11774) IllegalArgumentException in HeapPriorityQueueSet

2020-01-13 Thread Jira


[ 
https://issues.apache.org/jira/browse/FLINK-11774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17014344#comment-17014344
 ] 

Felix Wollschläger commented on FLINK-11774:


This error still occurs +sometimes+ if we use 
*_org.apache.flink.api.java.tuple.Tuple2_* as key and only when restoring from 
a checkpoint/savepoint.

The Tuple is created inside the KeySelector based on 2 final fields in our 
event-object (the key will never change during the lifetime of the object).
The first item in the Tuple is an _Enumtype_ and the second one a 
_java.lang.Long_. As a workaround we decided to use a string-concatenation 
instead of a Tuple2.

> IllegalArgumentException in HeapPriorityQueueSet
> 
>
> Key: FLINK-11774
> URL: https://issues.apache.org/jira/browse/FLINK-11774
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.7.2
> Environment: Can reproduce on the following configurations:
>  
> OS: macOS 10.14.3
> Java: 1.8.0_202
>  
> OS: CentOS 7.2.1511
> Java: 1.8.0_102
>Reporter: Kirill Vainer
>Priority: Major
> Attachments: flink-bug-dist.zip, flink-bug-src.zip
>
>
> Hi,
> I encountered the following exception:
> {code}
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
> at 
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:647)
> at 
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1510)
> at flink.bug.App.main(App.java:21)
> Caused by: java.lang.IllegalArgumentException
> at 
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.globalKeyGroupToLocalIndex(HeapPriorityQueueSet.java:158)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForKeyGroup(HeapPriorityQueueSet.java:147)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForElement(HeapPriorityQueueSet.java:154)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:121)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:49)
> at 
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerProcessingTimeTimer(InternalTimerServiceImpl.java:197)
> at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.registerProcessingTimeTimer(WindowOperator.java:876)
> at 
> org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger.onElement(ProcessingTimeTrigger.java:36)
> at 
> org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger.onElement(ProcessingTimeTrigger.java:28)
> at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.onElement(WindowOperator.java:895)
> at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:396)
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
> at java.lang.Thread.run(Thread.java:748)
> {code}
> Code that reproduces the problem:
> {code:java}
> package flink.bug;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.sink.SinkFunction;
> import org.apache.flink.streaming.api.windowing.time.Time;
> public class App {
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(2);
> env.fromElements(1, 2)
> .map(Aggregate::new)
> .keyBy(Aggregate::getKey)
> .timeWindow(Time.seconds(2))
> .reduce(Aggregate::reduce)
> .addSink(new CollectSink());
> env.execute();
> }
> private static class Aggregate {
> private Key key = new Key();
> public Aggregate(long number) {
> }
> public static Aggregate reduce(Aggregate a, Aggregate b)

[jira] [Commented] (FLINK-11774) IllegalArgumentException in HeapPriorityQueueSet

2019-08-07 Thread Julian Bauss (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16902035#comment-16902035
 ] 

Julian Bauss commented on FLINK-11774:
--

I encountered the same issue while using a Tuple2 as a key 
on a Stream.

> IllegalArgumentException in HeapPriorityQueueSet
> 
>
> Key: FLINK-11774
> URL: https://issues.apache.org/jira/browse/FLINK-11774
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.7.2
> Environment: Can reproduce on the following configurations:
>  
> OS: macOS 10.14.3
> Java: 1.8.0_202
>  
> OS: CentOS 7.2.1511
> Java: 1.8.0_102
>Reporter: Kirill Vainer
>Priority: Major
> Attachments: flink-bug-dist.zip, flink-bug-src.zip
>
>
> Hi,
> I encountered the following exception:
> {code}
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
> at 
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:647)
> at 
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1510)
> at flink.bug.App.main(App.java:21)
> Caused by: java.lang.IllegalArgumentException
> at 
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.globalKeyGroupToLocalIndex(HeapPriorityQueueSet.java:158)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForKeyGroup(HeapPriorityQueueSet.java:147)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForElement(HeapPriorityQueueSet.java:154)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:121)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:49)
> at 
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerProcessingTimeTimer(InternalTimerServiceImpl.java:197)
> at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.registerProcessingTimeTimer(WindowOperator.java:876)
> at 
> org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger.onElement(ProcessingTimeTrigger.java:36)
> at 
> org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger.onElement(ProcessingTimeTrigger.java:28)
> at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.onElement(WindowOperator.java:895)
> at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:396)
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
> at java.lang.Thread.run(Thread.java:748)
> {code}
> Code that reproduces the problem:
> {code:java}
> package flink.bug;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.sink.SinkFunction;
> import org.apache.flink.streaming.api.windowing.time.Time;
> public class App {
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(2);
> env.fromElements(1, 2)
> .map(Aggregate::new)
> .keyBy(Aggregate::getKey)
> .timeWindow(Time.seconds(2))
> .reduce(Aggregate::reduce)
> .addSink(new CollectSink());
> env.execute();
> }
> private static class Aggregate {
> private Key key = new Key();
> public Aggregate(long number) {
> }
> public static Aggregate reduce(Aggregate a, Aggregate b) {
> return new Aggregate(0);
> }
> public Key getKey() {
> return key;
> }
> }
> public static class Key {
> }
> private static class CollectSink implements SinkFunction {
> private static final long serialVersionUID = 1;
> @SuppressWarnings("rawtypes")
> @Override
> public void invoke(Aggregate value, Contex

[jira] [Commented] (FLINK-11774) IllegalArgumentException in HeapPriorityQueueSet

2019-06-26 Thread Xulang Wan (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16873124#comment-16873124
 ] 

Xulang Wan commented on FLINK-11774:


I also encountered this issue bu using a string as key. The key is randomly 
generated between [0,1000] per element. 

Seems like the error happened because the KeyGroup is out of KeyGroupRange 
which doesn't make sense to me since non of them should be changed once the 
application has run.

> IllegalArgumentException in HeapPriorityQueueSet
> 
>
> Key: FLINK-11774
> URL: https://issues.apache.org/jira/browse/FLINK-11774
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.7.2
> Environment: Can reproduce on the following configurations:
>  
> OS: macOS 10.14.3
> Java: 1.8.0_202
>  
> OS: CentOS 7.2.1511
> Java: 1.8.0_102
>Reporter: Kirill Vainer
>Priority: Major
> Attachments: flink-bug-dist.zip, flink-bug-src.zip
>
>
> Hi,
> I encountered the following exception:
> {code}
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
> at 
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:647)
> at 
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1510)
> at flink.bug.App.main(App.java:21)
> Caused by: java.lang.IllegalArgumentException
> at 
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.globalKeyGroupToLocalIndex(HeapPriorityQueueSet.java:158)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForKeyGroup(HeapPriorityQueueSet.java:147)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForElement(HeapPriorityQueueSet.java:154)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:121)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:49)
> at 
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerProcessingTimeTimer(InternalTimerServiceImpl.java:197)
> at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.registerProcessingTimeTimer(WindowOperator.java:876)
> at 
> org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger.onElement(ProcessingTimeTrigger.java:36)
> at 
> org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger.onElement(ProcessingTimeTrigger.java:28)
> at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.onElement(WindowOperator.java:895)
> at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:396)
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
> at java.lang.Thread.run(Thread.java:748)
> {code}
> Code that reproduces the problem:
> {code:java}
> package flink.bug;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.sink.SinkFunction;
> import org.apache.flink.streaming.api.windowing.time.Time;
> public class App {
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(2);
> env.fromElements(1, 2)
> .map(Aggregate::new)
> .keyBy(Aggregate::getKey)
> .timeWindow(Time.seconds(2))
> .reduce(Aggregate::reduce)
> .addSink(new CollectSink());
> env.execute();
> }
> private static class Aggregate {
> private Key key = new Key();
> public Aggregate(long number) {
> }
> public static Aggregate reduce(Aggregate a, Aggregate b) {
> return new Aggregate(0);
> }
> public Key getKey() {
> return key;
> }
> }
> public static class Key {
> }
> private stati

[jira] [Commented] (FLINK-11774) IllegalArgumentException in HeapPriorityQueueSet

2019-03-07 Thread JIRA


[ 
https://issues.apache.org/jira/browse/FLINK-11774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16786807#comment-16786807
 ] 

Felix Wollschläger commented on FLINK-11774:


I encountered the same exception when using a custom type as a key. It seems 
like it only occurs when restoring from a savepoint or checkpoint.

 

I already encountered it back with Flink 1.3.2 (so it doesn't seem to be a new 
bug) and kept the stacktrace. If it helps, this was the stacktrace in Flink 
1.3.2:
{code:java}
/**
 * Causes the job to throw:
 * java.lang.IllegalArgumentException: Key Group 75 does not belong to the 
local range.
 * at 
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:139)
 * at 
org.apache.flink.streaming.api.operators.HeapInternalTimerService.getIndexForKeyGroup(HeapInternalTimerService.java:409)
 * at 
org.apache.flink.streaming.api.operators.HeapInternalTimerService.getEventTimeTimerSetForKeyGroup(HeapInternalTimerService.java:362)
 * at 
org.apache.flink.streaming.api.operators.HeapInternalTimerService.getEventTimeTimerSetForTimer(HeapInternalTimerService.java:352)
 * at 
org.apache.flink.streaming.api.operators.HeapInternalTimerService.registerEventTimeTimer(HeapInternalTimerService.java:226)
 * at 
org.apache.flink.streaming.api.SimpleTimerService.registerEventTimeTimer(SimpleTimerService.java:54)
 * at SomeProcessFunction.processElement(SomeProcessFunction.java:115)
 * at SomeProcessFunction.processElement(SomeProcessFunction.java:35)
 * at 
SomeAbstractProcessFunction.processElement(SomeAbstractProcessFunction.java:26)
 * at 
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:94)
 * at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:207)
 * at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
 * at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
 * at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
 * at java.lang.Thread.run(Thread.java:748)
 */{code}
Back then I simply changed my key to a String but it would be nice if this 
would be fixed for future versions.

> IllegalArgumentException in HeapPriorityQueueSet
> 
>
> Key: FLINK-11774
> URL: https://issues.apache.org/jira/browse/FLINK-11774
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.7.2
> Environment: Can reproduce on the following configurations:
>  
> OS: macOS 10.14.3
> Java: 1.8.0_202
>  
> OS: CentOS 7.2.1511
> Java: 1.8.0_102
>Reporter: Kirill Vainer
>Priority: Major
> Attachments: flink-bug-dist.zip, flink-bug-src.zip
>
>
> Hi,
> I encountered the following exception:
> {code}
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
> at 
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:647)
> at 
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1510)
> at flink.bug.App.main(App.java:21)
> Caused by: java.lang.IllegalArgumentException
> at 
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.globalKeyGroupToLocalIndex(HeapPriorityQueueSet.java:158)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForKeyGroup(HeapPriorityQueueSet.java:147)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForElement(HeapPriorityQueueSet.java:154)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:121)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:49)
> at 
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerProcessingTimeTimer(InternalTimerServiceImpl.java:197)
> at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.registerProcessingTimeTimer(WindowOperator.java:876)
> at 
> org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger.onElement(ProcessingTimeTrigger.java:36)
> at 
> org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger.onElement(ProcessingTimeTrigger.java:28)
> at 
> org.apache.flink.streaming.r

[jira] [Commented] (FLINK-11774) IllegalArgumentException in HeapPriorityQueueSet

2019-03-07 Thread Thi Viet Quynh Nguyen (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16786803#comment-16786803
 ] 

Thi Viet Quynh Nguyen commented on FLINK-11774:
---

I also encounter the same problem, interestingly only when the job is restarted 
with save-/checkpoint. 
When started without save-/checkpoint, there is no problem.

> IllegalArgumentException in HeapPriorityQueueSet
> 
>
> Key: FLINK-11774
> URL: https://issues.apache.org/jira/browse/FLINK-11774
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.7.2
> Environment: Can reproduce on the following configurations:
>  
> OS: macOS 10.14.3
> Java: 1.8.0_202
>  
> OS: CentOS 7.2.1511
> Java: 1.8.0_102
>Reporter: Kirill Vainer
>Priority: Major
> Attachments: flink-bug-dist.zip, flink-bug-src.zip
>
>
> Hi,
> I encountered the following exception:
> {code}
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
> at 
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:647)
> at 
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1510)
> at flink.bug.App.main(App.java:21)
> Caused by: java.lang.IllegalArgumentException
> at 
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.globalKeyGroupToLocalIndex(HeapPriorityQueueSet.java:158)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForKeyGroup(HeapPriorityQueueSet.java:147)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForElement(HeapPriorityQueueSet.java:154)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:121)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:49)
> at 
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerProcessingTimeTimer(InternalTimerServiceImpl.java:197)
> at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.registerProcessingTimeTimer(WindowOperator.java:876)
> at 
> org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger.onElement(ProcessingTimeTrigger.java:36)
> at 
> org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger.onElement(ProcessingTimeTrigger.java:28)
> at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.onElement(WindowOperator.java:895)
> at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:396)
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
> at java.lang.Thread.run(Thread.java:748)
> {code}
> Code that reproduces the problem:
> {code:java}
> package flink.bug;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.sink.SinkFunction;
> import org.apache.flink.streaming.api.windowing.time.Time;
> public class App {
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(2);
> env.fromElements(1, 2)
> .map(Aggregate::new)
> .keyBy(Aggregate::getKey)
> .timeWindow(Time.seconds(2))
> .reduce(Aggregate::reduce)
> .addSink(new CollectSink());
> env.execute();
> }
> private static class Aggregate {
> private Key key = new Key();
> public Aggregate(long number) {
> }
> public static Aggregate reduce(Aggregate a, Aggregate b) {
> return new Aggregate(0);
> }
> public Key getKey() {
> return key;
> }
> }
> public static class Key {
> }
> private static class CollectSink implements SinkFunction {
> private static final long serialVersionUID = 1;
> 

[jira] [Commented] (FLINK-11774) IllegalArgumentException in HeapPriorityQueueSet

2019-02-28 Thread Congxian Qiu(klion26) (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16780426#comment-16780426
 ] 

Congxian Qiu(klion26) commented on FLINK-11774:
---

Running the given code on master branch also has the problem, I'll start to 
investigate this.

> IllegalArgumentException in HeapPriorityQueueSet
> 
>
> Key: FLINK-11774
> URL: https://issues.apache.org/jira/browse/FLINK-11774
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.7.2
> Environment: Can reproduce on the following configurations:
>  
> OS: macOS 10.14.3
> Java: 1.8.0_202
>  
> OS: CentOS 7.2.1511
> Java: 1.8.0_102
>Reporter: Kirill Vainer
>Priority: Major
> Attachments: flink-bug-dist.zip, flink-bug-src.zip
>
>
> Hi,
> I encountered the following exception:
> {code}
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
> at 
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:647)
> at 
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1510)
> at flink.bug.App.main(App.java:21)
> Caused by: java.lang.IllegalArgumentException
> at 
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.globalKeyGroupToLocalIndex(HeapPriorityQueueSet.java:158)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForKeyGroup(HeapPriorityQueueSet.java:147)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForElement(HeapPriorityQueueSet.java:154)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:121)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:49)
> at 
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerProcessingTimeTimer(InternalTimerServiceImpl.java:197)
> at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.registerProcessingTimeTimer(WindowOperator.java:876)
> at 
> org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger.onElement(ProcessingTimeTrigger.java:36)
> at 
> org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger.onElement(ProcessingTimeTrigger.java:28)
> at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.onElement(WindowOperator.java:895)
> at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:396)
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
> at java.lang.Thread.run(Thread.java:748)
> {code}
> Code that reproduces the problem:
> {code:java}
> package flink.bug;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.sink.SinkFunction;
> import org.apache.flink.streaming.api.windowing.time.Time;
> public class App {
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(2);
> env.fromElements(1, 2)
> .map(Aggregate::new)
> .keyBy(Aggregate::getKey)
> .timeWindow(Time.seconds(2))
> .reduce(Aggregate::reduce)
> .addSink(new CollectSink());
> env.execute();
> }
> private static class Aggregate {
> private Key key = new Key();
> public Aggregate(long number) {
> }
> public static Aggregate reduce(Aggregate a, Aggregate b) {
> return new Aggregate(0);
> }
> public Key getKey() {
> return key;
> }
> }
> public static class Key {
> }
> private static class CollectSink implements SinkFunction {
> private static final long serialVersionUID = 1;
> @SuppressWarnings("rawtypes")
> @Override
> pu