EventTime in streaming
After some work experience with the current solution I want to give some feedback and maybe start a discussion about event time in streaming. This is not about watermarks or any of the incoming improvements just some observations from the current code. *Starttime for EventTime:* In the current implementation you can specify a start time if you don't it defaults to 0. The default is not feasible when using the typical milliseconds since 1970. The *TimeTriggerPolicy* has the following implementation of *preNotifyTrigger*: @Override > public synchronized Object[] preNotifyTrigger(DATA datapoint) { > LinkedList fakeElements = new LinkedList(); > // check if there is more then one window border missed > // use > here. In case >= would fit, the regular call will do the job. > while (timestampWrapper.getTimestamp(datapoint) >= startTime + granularity) > { > startTime += granularity; > fakeElements.add(startTime - 1); > } > return (Object[]) fakeElements.toArray(); > } In practice this means using the default starttime will crash the program (running our of memory) since it will create fake elements to close every possible window since 1970. So you need to set a starttime to make it run which is not that simple. In production you could use the systemtime to initialize, but this might lead to some problems when consuming events from e.g. Kafka with an older timestamp. When debugging using old streams you need to know the lowest timestamp of the stream to initialize. What is the purpose of the fake elements? Is there a way to avoid the memory problem of creating enormous amounts of empty windows? Could we just use the timestamp of the first event processed as starttime instead of having it as a parameter? I testing the following modification of the above code at the moment, do you see any problem with that? @Override > public synchronized Object[] preNotifyTrigger(DATA datapoint) { > LinkedList fakeElements = new LinkedList(); > // check if there is more then one window border missed > // use > here. In case >= would fit, the regular call will do the job. > // TODO modified here > if(startTime == 0) startTime = timestampWrapper.getTimestamp(datapoint); > while (timestampWrapper.getTimestamp(datapoint) >= startTime + granularity) > { > startTime += granularity; > fakeElements.add(startTime - 1); > } > return (Object[]) fakeElements.toArray(); > } *EventTime api confusion:* I found several ways to use EventTime in my program but I find them not very intuitive. Compare the two following lines of code both using the Time.of helper one with event time and one with system time: ds.window(Time.of(long windowSize, TimeUnit)) ds.window(Time.of(long windowSize, Timestamp yourTimeStampExtractor, long startTime)) Its weird that you cannot specify the TimeUnit when using the EventTimes stamp. It would feel more natural if it would look like this (also without the starttime): ds.window(Time.of(long windowSize, TimeUnit, Timestamp yourTimeStampExtractor)) At the moment I'm using the modified TimeTriggerPolicy direct leading to this ugly piece of code: .window(new TimeTriggerPolicyHack(10l, new TimestampWrapper(new EventTimeStampExtractor(), 0l)), new TimeEvictionPolicy(2, new TimestampWrapper(new EventTimeStampExtractor(), 0l))) cheers Martin
Tests - Unit Tests versus Integration Tests
Hi all! The build time of Flink with all tests is nearing 1h on Travis for the shortest run. It is good that we do excessive testing, there are many mechanisms that need that. I have also seen that a lot of fixes that could be tested in a UnitTest style are actually tested as a full Flink program (Integration test style) While these tests are always easier to write, they have two problems: - The bring up the build time by about 5 secs per test - They are often not as targeted to the problem as a UnitTest I would like to encourage everyone to keep this in mind and do Unit tests in the cases where they are the preferred choice. Please also keep that in mind when reviewing pull requests. For Example: - API / TypeInformation changes can be very well tested without running the program. Simply create the program and test the operator's type info. - Custom functions can be very well tested in isolation - Input/Output formats actually test well in UnitTests. Integration tests need to be used when verifying behavior across components / layers, so keep using them when they need to be used. Greetings, Stephan
[jira] [Created] (FLINK-2693) Refactor InvalidTypesException to be checked
Chesnay Schepler created FLINK-2693: --- Summary: Refactor InvalidTypesException to be checked Key: FLINK-2693 URL: https://issues.apache.org/jira/browse/FLINK-2693 Project: Flink Issue Type: Improvement Components: Java API Reporter: Chesnay Schepler Assignee: Chesnay Schepler Priority: Minor When the TypeExtractor fails, it generally throws an InvalidTypesException. This is currently an unchecked exception, although we sometimes recover from it, usually by creating a MissingTypeInfo manually. Furthermore, the extractor can also throw IllegalArgumentExceptions in some cases. Figuring out which exception is thrown under which conditions is pretty tricky, causing issues such as FLINK-2557. This should be rectified by # making InvalidTypesException a checked exception # only throwing an InvalidTypesException upon failure -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Flink ML linear regression issue
Hi I'm using Flink ML 9.2.1 in order to perform a multiple linear regression with a csv data file. The Scala sample code for it is pretty straightforward: val mlr = MultipleLinearRegression() val parameters = ParameterMap() parameters.add(MultipleLinearRegression.Stepsize, 2.0) parameters.add(MultipleLinearRegression.Iterations, 10) parameters.add(MultipleLinearRegression.ConvergenceThreshold, 0.001) val inputDS = env.fromCollection(data) mlr.fit(inputDS, parameters) When I'm using Java(8) the fit method includes 3 parameters 1. dataset 2.parameters 3. object which implements -fitOperation interface multipleLinearRegression.fit(regressionDS, parameters,fitOperation); Is there a need to implement the fitOperation interface which have been already implemented in Flinks ml source code. Another option is using MultipleLinearRegression.fitMLR() method ,but I haven't found a way to pass the train dataset to it as a parameter or by setter. I'll be more than happy if you could guide me how to implement it in Java Thanks Hanan Meyer
[jira] [Created] (FLINK-2697) Deadlock in StreamDiscretizer
Till Rohrmann created FLINK-2697: Summary: Deadlock in StreamDiscretizer Key: FLINK-2697 URL: https://issues.apache.org/jira/browse/FLINK-2697 Project: Flink Issue Type: Bug Components: Streaming Reporter: Till Rohrmann Encountered a deadlock in the {{StreamDiscretizer}} {code} Found one Java-level deadlock: = "Thread-11": waiting to lock monitor 0x7f9d081e1ab8 (object 0xff6b4590, a org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer), which is held by "StreamDiscretizer -> TumblingGroupedPreReducer -> (Filter, ExtractParts) (3/4)" "StreamDiscretizer -> TumblingGroupedPreReducer -> (Filter, ExtractParts) (3/4)": waiting to lock monitor 0x7f9d081e20e8 (object 0xff75fd88, a org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy), which is held by "Thread-11" Java stack information for the threads listed above: === "Thread-11": at org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer.triggerOnFakeElement(StreamDiscretizer.java:121) - waiting to lock <0xff6b4590> (a org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer) at org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer$WindowingCallback.sendFakeElement(StreamDiscretizer.java:203) at org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy.activeFakeElementEmission(TimeTriggerPolicy.java:117) - locked <0xff75fd88> (a org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy) at org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy$TimeCheck.run(TimeTriggerPolicy.java:144) at java.lang.Thread.run(Thread.java:745) "StreamDiscretizer -> TumblingGroupedPreReducer -> (Filter, ExtractParts) (3/4)": at org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy.preNotifyTrigger(TimeTriggerPolicy.java:74) - waiting to lock <0xff75fd88> (a org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy) at org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer.processRealElement(StreamDiscretizer.java:91) - locked <0xff6b4590> (a org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer) at org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer.processElement(StreamDiscretizer.java:73) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:162) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:56) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:171) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581) at java.lang.Thread.run(Thread.java:745) Found 1 deadlock. {code} https://s3.amazonaws.com/archive.travis-ci.org/jobs/80770719/log.txt -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2696) ZookeeperOffsetHandlerTest.runOffsetManipulationinZooKeeperTest failed on Travis
Till Rohrmann created FLINK-2696: Summary: ZookeeperOffsetHandlerTest.runOffsetManipulationinZooKeeperTest failed on Travis Key: FLINK-2696 URL: https://issues.apache.org/jira/browse/FLINK-2696 Project: Flink Issue Type: Bug Reporter: Till Rohrmann Priority: Critical The {{ZookeeperOffsetHandlerTest.runOffsetManipulationinZooKeeperTest}} failed on Travis with {code} --- T E S T S --- Running org.apache.flink.streaming.connectors.kafka.KafkaConsumerPartitionAssignmentTest Tests run: 5, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.18 sec - in org.apache.flink.streaming.connectors.kafka.KafkaConsumerPartitionAssignmentTest Running org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandlerTest org.I0Itec.zkclient.exception.ZkTimeoutException: Unable to connect to zookeeper server within timeout: 2 at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:880) at org.I0Itec.zkclient.ZkClient.(ZkClient.java:98) at org.I0Itec.zkclient.ZkClient.(ZkClient.java:84) at org.apache.flink.streaming.connectors.kafka.KafkaTestBase.createZookeeperClient(KafkaTestBase.java:278) at org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandlerTest.runOffsetManipulationinZooKeeperTest(ZookeeperOffsetHandlerTest.java:44) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.runners.ParentRunner.run(ParentRunner.java:309) at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283) at org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:173) at org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153) at org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:128) at org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203) at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155) at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103) Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 43.695 sec <<< FAILURE! - in org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandlerTest runOffsetManipulationinZooKeeperTest(org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandlerTest) Time elapsed: 21.258 sec <<< FAILURE! java.lang.AssertionError: Unable to connect to zookeeper server within timeout: 2 at org.junit.Assert.fail(Assert.java:88) at org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandlerTest.runOffsetManipulationinZooKeeperTest(ZookeeperOffsetHandlerTest.java:57) Running org.apache.flink.streaming.connectors.kafka.KafkaConsumerTest Tests run: 3, Failures: 0, Errors: 0, Skipped: 1, Time elapsed: 0.385 sec - in org.apache.flink.streaming.connectors.kafka.KafkaConsumerTest Running org.apache.flink.streaming.connectors.kafka.TestFixedPartitioner Tests run: 3, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.009 sec - in
[jira] [Created] (FLINK-2694) JobManagerProcessReapingTest.testReapProcessOnFailure failed on Travis
Till Rohrmann created FLINK-2694: Summary: JobManagerProcessReapingTest.testReapProcessOnFailure failed on Travis Key: FLINK-2694 URL: https://issues.apache.org/jira/browse/FLINK-2694 Project: Flink Issue Type: Bug Reporter: Till Rohrmann Priority: Critical I observed a failing {{JobManagerProcessReapingTest.testReapProcessOnFailure}} test case on Travis. The reason for the test failure seems to be that the {{JobManager}} could not be started. The reason for this was that Netty could not bind to the specified port. https://s3.amazonaws.com/archive.travis-ci.org/jobs/80642036/log.txt -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2695) KafkaITCase.testConcurrentProducerConsumerTopology failed on Travis
Till Rohrmann created FLINK-2695: Summary: KafkaITCase.testConcurrentProducerConsumerTopology failed on Travis Key: FLINK-2695 URL: https://issues.apache.org/jira/browse/FLINK-2695 Project: Flink Issue Type: Bug Reporter: Till Rohrmann Priority: Critical The {{KafkaITCase.testConcurrentProducerConsumerTopology}} failed on Travis with {code} --- T E S T S --- Running org.apache.flink.streaming.connectors.kafka.KafkaProducerITCase Running org.apache.flink.streaming.connectors.kafka.KafkaITCase Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 14.296 sec - in org.apache.flink.streaming.connectors.kafka.KafkaProducerITCase 09/16/2015 17:19:36 Job execution switched to status RUNNING. 09/16/2015 17:19:36 Source: Custom Source -> Sink: Unnamed(1/1) switched to SCHEDULED 09/16/2015 17:19:36 Source: Custom Source -> Sink: Unnamed(1/1) switched to DEPLOYING 09/16/2015 17:19:36 Source: Custom Source -> Sink: Unnamed(1/1) switched to RUNNING 09/16/2015 17:19:36 Source: Custom Source -> Sink: Unnamed(1/1) switched to FINISHED 09/16/2015 17:19:36 Job execution switched to status FINISHED. 09/16/2015 17:19:36 Job execution switched to status RUNNING. 09/16/2015 17:19:36 Source: Custom Source -> Map -> Flat Map(1/1) switched to SCHEDULED 09/16/2015 17:19:36 Source: Custom Source -> Map -> Flat Map(1/1) switched to DEPLOYING 09/16/2015 17:19:36 Source: Custom Source -> Map -> Flat Map(1/1) switched to RUNNING 09/16/2015 17:19:36 Source: Custom Source -> Map -> Flat Map(1/1) switched to FAILED java.lang.Exception: Could not forward element to next operator at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:382) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:58) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:171) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.RuntimeException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingChainingOutput.collect(OutputHandler.java:332) at org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingChainingOutput.collect(OutputHandler.java:316) at org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:50) at org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:30) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$SourceOutput.collect(SourceStreamTask.java:92) at org.apache.flink.streaming.api.operators.StreamSource$NonTimestampContext.collect(StreamSource.java:88) at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:449) Caused by: java.lang.RuntimeException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingChainingOutput.collect(OutputHandler.java:332) at org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingChainingOutput.collect(OutputHandler.java:316) at org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:50) at org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:30) at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:37) at org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingChainingOutput.collect(OutputHandler.java:329) ... 6 more Caused by: org.apache.flink.streaming.connectors.kafka.testutils.SuccessException at org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase$7.flatMap(KafkaConsumerTestBase.java:896) at org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase$7.flatMap(KafkaConsumerTestBase.java:876) at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:47) at org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingChainingOutput.collect(OutputHandler.java:329) ... 11 more 09/16/2015 17:19:36 Job execution switched to status FAILING. 09/16/2015 17:19:36 Job execution switched to status FAILED. org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed. at
[jira] [Created] (FLINK-2698) Add trailing newline to flink-conf.yaml
Greg Hogan created FLINK-2698: - Summary: Add trailing newline to flink-conf.yaml Key: FLINK-2698 URL: https://issues.apache.org/jira/browse/FLINK-2698 Project: Flink Issue Type: Improvement Affects Versions: master Reporter: Greg Hogan Assignee: Greg Hogan Priority: Minor The distributed flink-conf.yaml does not contain a trailing newline. This interferes with [bdutil|https://github.com/GoogleCloudPlatform/bdutil/blob/master/extensions/flink/install_flink.sh#L64] which appends extra/override configuration parameters with a heredoc. There are many other files without trailing newlines, but this looks to be the only detrimental effect. {code} for i in $(find * -type f) ; do if diff /dev/null "$i" | tail -1 | grep '^\\ No newline' > /dev/null; then echo $i; fi; done {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Flink ML linear regression issue
Hello everyone. Do you have a sample in Java how to implement Flink MultipleLinearRegression example? Scala is great, however we would like to see the exact example we could invoke it from Java if it is possible. Thanks and sorry for the interrupt. On Thu, Sep 17, 2015 at 4:27 PM, Hanan Meyerwrote: > Hi > > I'm using Flink ML 9.2.1 in order to perform a multiple linear regression > with a csv data file. > > The Scala sample code for it is pretty straightforward: > val mlr = MultipleLinearRegression() > > val parameters = ParameterMap() > > parameters.add(MultipleLinearRegression.Stepsize, 2.0) > parameters.add(MultipleLinearRegression.Iterations, 10) > parameters.add(MultipleLinearRegression.ConvergenceThreshold, 0.001) > val inputDS = env.fromCollection(data) > > mlr.fit(inputDS, parameters) > > When I'm using Java(8) the fit method includes 3 parameters > 1. dataset > 2.parameters > 3. object which implements -fitOperation interface > > multipleLinearRegression.fit(regressionDS, parameters,fitOperation); > > Is there a need to implement the fitOperation interface which have been > already > implemented in Flinks ml source code. > > Another option is using MultipleLinearRegression.fitMLR() method ,but I > haven't found a way to pass the train dataset to it as a parameter or by > setter. > > I'll be more than happy if you could guide me how to implement it in Java > > Thanks > > Hanan Meyer > > > > > -- *Regards* *Alexey Sapozhnikov* CTO& Co-Founder Scalabillit Inc Aba Even 10-C, Herzelia, Israel M : +972-52-2363823 E : ale...@scalabill.it W : http://www.scalabill.it YT - https://youtu.be/9Rj309PTOFA Map:http://mapta.gs/Scalabillit Revolutionizing Proof-of-Concept
Java type erasure and object reuse
What is best practice for handling Java type erasure in user defined functions? Is there a means by which the TypeInformation can be accessed from a RichFunction? My temporary solution was to add a "T copy()" method to the CopyableValue interface. A common use case is a GroupReduceFunction that needs to collect objects. With object reuse we need to make a copy and with type erasure we cannot call new. Greg Hogan
[jira] [Created] (FLINK-2699) Flink is filling Spark JIRA with incorrect PR links
Patrick Wendell created FLINK-2699: -- Summary: Flink is filling Spark JIRA with incorrect PR links Key: FLINK-2699 URL: https://issues.apache.org/jira/browse/FLINK-2699 Project: Flink Issue Type: Bug Reporter: Patrick Wendell Priority: Blocker I think you guys are using our script for synchronizing JIRA. However, you didn't adjust the target JIRA identifier so it is still posting to Spark. In the past few hours we've seen a lot of random Flink pull requests being linked on the Spark JIRA. This is obviously not desirable for us since they are different projects. The JIRA links are being created by the user "Maximilian Michels" ([~mxm]). https://issues.apache.org/jira/secure/ViewProfile.jspa?name=mxm I saw these as recently as 5 hours ago - but if you've fixed it already go ahead and close this. Thanks. -- This message was sent by Atlassian JIRA (v6.3.4#6332)