[GitHub] flink pull request #2622: [FLINK-3706] Fix YARN test instability
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2622 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3706) YARNSessionCapacitySchedulerITCase.testNonexistingQueue unstable
[ https://issues.apache.org/jira/browse/FLINK-3706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568268#comment-15568268 ] ASF GitHub Bot commented on FLINK-3706: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2622 > YARNSessionCapacitySchedulerITCase.testNonexistingQueue unstable > > > Key: FLINK-3706 > URL: https://issues.apache.org/jira/browse/FLINK-3706 > Project: Flink > Issue Type: Bug >Reporter: Aljoscha Krettek >Assignee: Robert Metzger >Priority: Critical > Labels: test-stability > Attachments: log.txt > > > I encountered a failed test on travis. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-4802) distinct() implicitly uses 0th field, when called without a parameter
[ https://issues.apache.org/jira/browse/FLINK-4802?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler reassigned FLINK-4802: --- Assignee: Chesnay Schepler > distinct() implicitly uses 0th field, when called without a parameter > - > > Key: FLINK-4802 > URL: https://issues.apache.org/jira/browse/FLINK-4802 > Project: Flink > Issue Type: Bug > Components: Python API >Reporter: Yakov Goldberg >Assignee: Chesnay Schepler > > Check this code in DataSet.py > {code} > def distinct(self, *fields): > f = None > if len(fields) == 0: > f = lambda x: (x,) > fields = (0,) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-4805) Stringify() crashes with Python3 (run with pyflink3)
[ https://issues.apache.org/jira/browse/FLINK-4805?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler reassigned FLINK-4805: --- Assignee: Chesnay Schepler > Stringify() crashes with Python3 (run with pyflink3) > > > Key: FLINK-4805 > URL: https://issues.apache.org/jira/browse/FLINK-4805 > Project: Flink > Issue Type: Bug > Components: Python API >Reporter: Yakov Goldberg >Assignee: Chesnay Schepler > > {code} > Caused by: java.lang.RuntimeException: External process for task MapPartition > (PythonMap) terminated prematurely due to an error. > Traceback (most recent call last): > File > "/tmp/flink-dist-cache-299804c6-813a-44de-9f62-c5f4cf415990/1527bd1cc45d6f67695c180762c614ef/flink/plan.py", > line 548, in > env.execute(local=True) > File > "/tmp/flink-dist-cache-299804c6-813a-44de-9f62-c5f4cf415990/1527bd1cc45d6f67695c180762c614ef/flink/flink/plan/Environment.py", > line 181, in execute > operator._go() > File > "/tmp/flink-dist-cache-299804c6-813a-44de-9f62-c5f4cf415990/1527bd1cc45d6f67695c180762c614ef/flink/flink/functions/Function.py", > line 64, in _go > self._run() > File > "/tmp/flink-dist-cache-299804c6-813a-44de-9f62-c5f4cf415990/1527bd1cc45d6f67695c180762c614ef/flink/flink/functions/MapFunction.py", > line 29, in _run > collector.collect(function(value)) > File > "/tmp/flink-dist-cache-299804c6-813a-44de-9f62-c5f4cf415990/1527bd1cc45d6f67695c180762c614ef/flink/flink/plan/DataSet.py", > line 38, in map > return "(" + b", ".join([self.map(x) for x in value]) + ")" > File > "/tmp/flink-dist-cache-299804c6-813a-44de-9f62-c5f4cf415990/1527bd1cc45d6f67695c180762c614ef/flink/flink/plan/DataSet.py", > line 38, in > return "(" + b", ".join([self.map(x) for x in value]) + ")" > File > "/tmp/flink-dist-cache-299804c6-813a-44de-9f62-c5f4cf415990/1527bd1cc45d6f67695c180762c614ef/flink/flink/plan/DataSet.py", > line 38, in map > return "(" + b", ".join([self.map(x) for x in value]) + ")" > TypeError: sequence item 0: expected bytes, bytearray, or an object with the > buffer interface, str found > at > org.apache.flink.python.api.streaming.data.PythonStreamer.streamBufferWithoutGroups(PythonStreamer.java:268) > at > org.apache.flink.python.api.functions.PythonMapPartition.mapPartition(PythonMapPartition.java:54) > at > org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:98) > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480) > at > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > at java.lang.Thread.run(Thread.java:745) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4799) Re-add build-target symlink to project root
[ https://issues.apache.org/jira/browse/FLINK-4799?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568290#comment-15568290 ] ASF GitHub Bot commented on FLINK-4799: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2620 > Re-add build-target symlink to project root > --- > > Key: FLINK-4799 > URL: https://issues.apache.org/jira/browse/FLINK-4799 > Project: Flink > Issue Type: Wish > Components: Build System >Affects Versions: 1.2.0, 1.1.3 >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Minor > Fix For: 1.2.0 > > > We have previously removed the plugin which created the 'build-target' link > to the build target directory. See FLINK-4732. At least one user has > requested to re-add the link. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-4794) partition_by_hash() crashes if no parameter is provided
[ https://issues.apache.org/jira/browse/FLINK-4794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler reassigned FLINK-4794: --- Assignee: Chesnay Schepler > partition_by_hash() crashes if no parameter is provided > --- > > Key: FLINK-4794 > URL: https://issues.apache.org/jira/browse/FLINK-4794 > Project: Flink > Issue Type: Bug > Components: Python API >Reporter: Yakov Goldberg >Assignee: Chesnay Schepler > > partition_by_hash() crashes if no parameter is provided. > Looks like a line of code was missed, check distinct() > {code} > def distinct(self, *fields): > f = None > if len(fields) == 0: > f = lambda x: (x,) > fields = (0,) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-4795) CsvStringify crashes in case of tuple in tuple, t.e. ("a", True, (1,5))
[ https://issues.apache.org/jira/browse/FLINK-4795?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler reassigned FLINK-4795: --- Assignee: Chesnay Schepler > CsvStringify crashes in case of tuple in tuple, t.e. ("a", True, (1,5)) > --- > > Key: FLINK-4795 > URL: https://issues.apache.org/jira/browse/FLINK-4795 > Project: Flink > Issue Type: Bug > Components: Python API >Reporter: Yakov Goldberg >Assignee: Chesnay Schepler > > CsvStringify crashes in case of tuple in tuple, t.e. ("a", True, (1,5)) > Looks like, mistyping in CsvStringify._map() > {code} > def _map(self, value): > if isinstance(value, (tuple, list)): > return "(" + b", ".join([self.map(x) for x in value]) + ")" > else: > return str(value) > {code} > self._map() should be called > But this will affect write_csv() and read_csv(). > write_csv() will work automatically > and read_csv() should be implemented to be able to read Tuple type. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4512) Add option for persistent checkpoints
[ https://issues.apache.org/jira/browse/FLINK-4512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568348#comment-15568348 ] ASF GitHub Bot commented on FLINK-4512: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2608#discussion_r82978898 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java --- @@ -256,29 +306,51 @@ public boolean acknowledgeTask( * Aborts a checkpoint because it expired (took too long). */ public void abortExpired() throws Exception { --- End diff -- I would like to do this as a follow up > Add option for persistent checkpoints > - > > Key: FLINK-4512 > URL: https://issues.apache.org/jira/browse/FLINK-4512 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > > Allow periodic checkpoints to be persisted by writing out their meta data. > This is what we currently do for savepoints, but in the future checkpoints > and savepoints are likely to diverge with respect to guarantees they give for > updatability, etc. > This means that the difference between persistent checkpoints and savepoints > in the long term will be that persistent checkpoints can only be restored > with the same job settings (like parallelism, etc.) > Regular and persisted checkpoints should behave differently with respect to > disposal in *globally* terminal job states (FINISHED, CANCELLED, FAILED): > regular checkpoints are cleaned up in all of these cases whereas persistent > checkpoints only on FINISHED. Maybe with the option to customize behaviour on > CANCELLED or FAILED. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2608: [FLINK-4512] [FLIP-10] Add option to persist perio...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2608#discussion_r82978898 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java --- @@ -256,29 +306,51 @@ public boolean acknowledgeTask( * Aborts a checkpoint because it expired (took too long). */ public void abortExpired() throws Exception { --- End diff -- I would like to do this as a follow up --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Resolved] (FLINK-4793) Using a local method with :: notation in Java 8 causes index out of bounds
[ https://issues.apache.org/jira/browse/FLINK-4793?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther resolved FLINK-4793. - Resolution: Fixed Fix Version/s: 1.2.0 Fixed in 1dda3ad009667697a620359e997e83a5ba2447dd. > Using a local method with :: notation in Java 8 causes index out of bounds > -- > > Key: FLINK-4793 > URL: https://issues.apache.org/jira/browse/FLINK-4793 > Project: Flink > Issue Type: Bug >Reporter: Ted Dunning >Assignee: Timo Walther > Fix For: 1.2.0 > > > I tried to use the toString method on an object as a map function: > {code} > .map(Trade::toString) > {code} > This caused an index out of bounds error: > {code} > java.lang.ArrayIndexOutOfBoundsException: -1 > at > org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:351) > at > org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:305) > at > org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:120) > at > org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:506) > at > com.mapr.aggregate.AggregateTest.testAggregateTrades(AggregateTest.java:81) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at > com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:68) > {code} > On the other hand, if I use a public static method, like this: > {code} > .map(Trade::fromString) > {code} > All is good. fromString and toString are defined like this: > {code} > public static Trade fromString(String s) throws IOException { > return mapper.readValue(s, Trade.class); > } > @Override > public String toString() { > return String.format("{\"%s\", %d, %d, %.2f}", symbol, time, volume, > price); > } > {code} > This might be a viable restriction on what functions I can use, but there > certainly should be a better error message, if so. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2599: [FLINK-4746] Make TaskManagerRuntimeInfo an interf...
Github user tillrohrmann closed the pull request at: https://github.com/apache/flink/pull/2599 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4492) Cleanup files from canceled snapshots
[ https://issues.apache.org/jira/browse/FLINK-4492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568220#comment-15568220 ] Nikolay Vasilishin commented on FLINK-4492: --- This issue has been resolved since [FLINK-3761] (commit 4809f5367b08a9734fc1bd4875be51a9f3bb65aa). Now canceling job causes cleaning up checkpoint directory (via FsCheckpointStateOutputStream#close() method). > Cleanup files from canceled snapshots > - > > Key: FLINK-4492 > URL: https://issues.apache.org/jira/browse/FLINK-4492 > Project: Flink > Issue Type: Bug >Reporter: Stefan Richter >Assignee: Nikolay Vasilishin >Priority: Minor > > Current checkpointing only closes CheckpointStateOutputStreams on cancel, but > incomplete files are not properly deleted from the filesystem. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2608: [FLINK-4512] [FLIP-10] Add option to persist perio...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2608#discussion_r82970716 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java --- @@ -219,33 +245,9 @@ public CheckpointCoordinator( * Shuts down the checkpoint coordinator. * * After this method has been called, the coordinator does not accept -* and further messages and cannot trigger any further checkpoints. All -* checkpoint state is discarded. -*/ - public void shutdown() throws Exception { - shutdown(true); - } - - /** -* Suspends the checkpoint coordinator. -* -* After this method has been called, the coordinator does not accept * and further messages and cannot trigger any further checkpoints. -* -* The difference to shutdown is that checkpoint state in the store -* and counter is kept around if possible to recover later. -*/ - public void suspend() throws Exception { - shutdown(false); - } - - /** -* Shuts down the checkpoint coordinator. -* -* @param shutdownStoreAndCounter Depending on this flag the checkpoint -* state services are shut down or suspended. */ - private void shutdown(boolean shutdownStoreAndCounter) throws Exception { + public void shutdown(JobStatus jobStatus) throws Exception { --- End diff -- True, but I thought we kept it `shutdown` for consistency reasons. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-4812) Export currentLowWatermark metric also for sources
Robert Metzger created FLINK-4812: - Summary: Export currentLowWatermark metric also for sources Key: FLINK-4812 URL: https://issues.apache.org/jira/browse/FLINK-4812 Project: Flink Issue Type: Improvement Components: Metrics Reporter: Robert Metzger As reported by a user, Flink does currently not export the current low watermark for sources (http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/currentLowWatermark-metric-not-reported-for-all-tasks-td13770.html). This JIRA is for adding such a metric for the sources as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2608: [FLINK-4512] [FLIP-10] Add option to persist perio...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2608#discussion_r82972407 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java --- @@ -64,9 +62,9 @@ public void recover() throws Exception { @Override public void addCheckpoint(CompletedCheckpoint checkpoint) throws Exception { - checkpoints.addLast(checkpoint); + checkpoints.add(checkpoint); if (checkpoints.size() > maxNumberOfCheckpointsToRetain) { - checkpoints.removeFirst().discardState(); + checkpoints.remove().subsume(); --- End diff -- Manually triggered savepoints for example are not discarded when they are subsumed. The CheckpointProperties constructor is package private (for testing) and only the static creator methods (for persistent checkpoints, regular checkpoints, and manually triggered savepoints) are publicly accessible. Let me add a check to the properties that only allow manual discard if the checkpoint is persisted. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4512) Add option for persistent checkpoints
[ https://issues.apache.org/jira/browse/FLINK-4512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568316#comment-15568316 ] ASF GitHub Bot commented on FLINK-4512: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2608#discussion_r82976634 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java --- @@ -233,68 +229,90 @@ public int getNumberOfRetainedCheckpoints() { } @Override - public void shutdown() throws Exception { - LOG.info("Shutting down"); + public void shutdown(JobStatus jobStatus) throws Exception { --- End diff -- Our only option would be to wrap in our own Exception, because Curator is throwing the general `Exception`. > Add option for persistent checkpoints > - > > Key: FLINK-4512 > URL: https://issues.apache.org/jira/browse/FLINK-4512 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > > Allow periodic checkpoints to be persisted by writing out their meta data. > This is what we currently do for savepoints, but in the future checkpoints > and savepoints are likely to diverge with respect to guarantees they give for > updatability, etc. > This means that the difference between persistent checkpoints and savepoints > in the long term will be that persistent checkpoints can only be restored > with the same job settings (like parallelism, etc.) > Regular and persisted checkpoints should behave differently with respect to > disposal in *globally* terminal job states (FINISHED, CANCELLED, FAILED): > regular checkpoints are cleaned up in all of these cases whereas persistent > checkpoints only on FINISHED. Maybe with the option to customize behaviour on > CANCELLED or FAILED. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2608: [FLINK-4512] [FLIP-10] Add option to persist perio...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2608#discussion_r82976869 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java --- @@ -48,13 +48,12 @@ public static CompletedCheckpoint loadAndValidateSavepoint( JobID jobId, Maptasks, - SavepointStore savepointStore, String savepointPath) throws Exception { // (1) load the savepoint - Savepoint savepoint = savepointStore.loadSavepoint(savepointPath); + Savepoint savepoint = SavepointStore.loadSavepoint(savepointPath); final Map taskStates = new HashMap<>(savepoint.getTaskStates().size()); - + // (2) validate it (parallelism, etc) for (TaskState taskState : savepoint.getTaskStates()) { ExecutionJobVertex executionJobVertex = tasks.get(taskState.getJobVertexID()); --- End diff -- Yes, updated. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4512) Add option for persistent checkpoints
[ https://issues.apache.org/jira/browse/FLINK-4512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568319#comment-15568319 ] ASF GitHub Bot commented on FLINK-4512: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2608#discussion_r82976869 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java --- @@ -48,13 +48,12 @@ public static CompletedCheckpoint loadAndValidateSavepoint( JobID jobId, Maptasks, - SavepointStore savepointStore, String savepointPath) throws Exception { // (1) load the savepoint - Savepoint savepoint = savepointStore.loadSavepoint(savepointPath); + Savepoint savepoint = SavepointStore.loadSavepoint(savepointPath); final Map taskStates = new HashMap<>(savepoint.getTaskStates().size()); - + // (2) validate it (parallelism, etc) for (TaskState taskState : savepoint.getTaskStates()) { ExecutionJobVertex executionJobVertex = tasks.get(taskState.getJobVertexID()); --- End diff -- Yes, updated. > Add option for persistent checkpoints > - > > Key: FLINK-4512 > URL: https://issues.apache.org/jira/browse/FLINK-4512 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > > Allow periodic checkpoints to be persisted by writing out their meta data. > This is what we currently do for savepoints, but in the future checkpoints > and savepoints are likely to diverge with respect to guarantees they give for > updatability, etc. > This means that the difference between persistent checkpoints and savepoints > in the long term will be that persistent checkpoints can only be restored > with the same job settings (like parallelism, etc.) > Regular and persisted checkpoints should behave differently with respect to > disposal in *globally* terminal job states (FINISHED, CANCELLED, FAILED): > regular checkpoints are cleaned up in all of these cases whereas persistent > checkpoints only on FINISHED. Maybe with the option to customize behaviour on > CANCELLED or FAILED. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4717) Naive version of atomic stop signal with savepoint
[ https://issues.apache.org/jira/browse/FLINK-4717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568191#comment-15568191 ] ASF GitHub Bot commented on FLINK-4717: --- Github user uce commented on the issue: https://github.com/apache/flink/pull/2609 Thanks for this review, too. Good catch with the broken `stopCheckpointScheduler`. I will add your proposed fix as a separate commit. > Naive version of atomic stop signal with savepoint > -- > > Key: FLINK-4717 > URL: https://issues.apache.org/jira/browse/FLINK-4717 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Till Rohrmann >Priority: Minor > Fix For: 1.2.0 > > > As a first step towards atomic stopping with savepoints we should implement a > cancel command which prior to cancelling takes a savepoint. Additionally, it > should turn off the periodic checkpointing so that there won't be checkpoints > executed between the savepoint and the cancel command. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2608: [FLINK-4512] [FLIP-10] Add option to persist perio...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2608#discussion_r82969876 --- Diff: flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java --- @@ -637,20 +637,25 @@ protected int savepoint(String[] args) { "Specify a Job ID to trigger a savepoint.")); } - return triggerSavepoint(options, jobId); + String savepointDirectory = null; + if (cleanedArgs.length == 2) { --- End diff -- Changed the check to `>= 2` and printed a message that some arguments are unneded. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4512) Add option for persistent checkpoints
[ https://issues.apache.org/jira/browse/FLINK-4512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568207#comment-15568207 ] ASF GitHub Bot commented on FLINK-4512: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2608#discussion_r82969876 --- Diff: flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java --- @@ -637,20 +637,25 @@ protected int savepoint(String[] args) { "Specify a Job ID to trigger a savepoint.")); } - return triggerSavepoint(options, jobId); + String savepointDirectory = null; + if (cleanedArgs.length == 2) { --- End diff -- Changed the check to `>= 2` and printed a message that some arguments are unneded. > Add option for persistent checkpoints > - > > Key: FLINK-4512 > URL: https://issues.apache.org/jira/browse/FLINK-4512 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > > Allow periodic checkpoints to be persisted by writing out their meta data. > This is what we currently do for savepoints, but in the future checkpoints > and savepoints are likely to diverge with respect to guarantees they give for > updatability, etc. > This means that the difference between persistent checkpoints and savepoints > in the long term will be that persistent checkpoints can only be restored > with the same job settings (like parallelism, etc.) > Regular and persisted checkpoints should behave differently with respect to > disposal in *globally* terminal job states (FINISHED, CANCELLED, FAILED): > regular checkpoints are cleaned up in all of these cases whereas persistent > checkpoints only on FINISHED. Maybe with the option to customize behaviour on > CANCELLED or FAILED. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2608: [FLINK-4512] [FLIP-10] Add option to persist perio...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2608#discussion_r82976383 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java --- @@ -172,7 +168,7 @@ public void recover() throws Exception { for (int i = 0; i < numberOfInitialCheckpoints - 1; i++) { try { - removeFromZooKeeperAndDiscardCheckpoint(initialCheckpoints.get(i)); + removeSubsumed(initialCheckpoints.get(i)); --- End diff -- Yes. Even more, I think this is generally dangerous. What if a checkpoint is recovered, but the checkpoint cannot be restored, than we will have lost all others. Since we currently only keep a single one anyways, it is not a problem yet. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Assigned] (FLINK-4804) Grouping.first() function usage fails
[ https://issues.apache.org/jira/browse/FLINK-4804?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler reassigned FLINK-4804: --- Assignee: Chesnay Schepler > Grouping.first() function usage fails > - > > Key: FLINK-4804 > URL: https://issues.apache.org/jira/browse/FLINK-4804 > Project: Flink > Issue Type: Bug >Reporter: Yakov Goldberg >Assignee: Chesnay Schepler > > Trying to use Grouping.first() in following example: > {code} > dd2 = env.from_elements((1, "data"), (1, "hello"), (1, "z")) > dd2 \ > .group_by(0) \ > .sort_group(1, Order.ASCENDING) \ > .first(2) \ > .reduce_group(PlainReduce(), combinable=True) > {code} > 1. Is this example correct? > 2. If so, got the following error: > {code} > > The program finished with the following exception: > org.apache.flink.client.program.ProgramInvocationException: The main method > caused an error. > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) > at org.apache.flink.client.program.Client.runBlocking(Client.java:248) > at > org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1192) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243) > Caused by: java.lang.ClassCastException: > org.apache.flink.api.java.operators.SortedGrouping cannot be cast to > org.apache.flink.api.java.DataSet > at > org.apache.flink.python.api.PythonPlanBinder.createFirstOperation(PythonPlanBinder.java:470) > at > org.apache.flink.python.api.PythonPlanBinder.receiveOperations(PythonPlanBinder.java:325) > at > org.apache.flink.python.api.PythonPlanBinder.receivePlan(PythonPlanBinder.java:236) > at > org.apache.flink.python.api.PythonPlanBinder.runPlan(PythonPlanBinder.java:140) > at > org.apache.flink.python.api.PythonPlanBinder.main(PythonPlanBinder.java:113) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4512) Add option for persistent checkpoints
[ https://issues.apache.org/jira/browse/FLINK-4512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568313#comment-15568313 ] ASF GitHub Bot commented on FLINK-4512: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2608#discussion_r82976383 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java --- @@ -172,7 +168,7 @@ public void recover() throws Exception { for (int i = 0; i < numberOfInitialCheckpoints - 1; i++) { try { - removeFromZooKeeperAndDiscardCheckpoint(initialCheckpoints.get(i)); + removeSubsumed(initialCheckpoints.get(i)); --- End diff -- Yes. Even more, I think this is generally dangerous. What if a checkpoint is recovered, but the checkpoint cannot be restored, than we will have lost all others. Since we currently only keep a single one anyways, it is not a problem yet. > Add option for persistent checkpoints > - > > Key: FLINK-4512 > URL: https://issues.apache.org/jira/browse/FLINK-4512 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > > Allow periodic checkpoints to be persisted by writing out their meta data. > This is what we currently do for savepoints, but in the future checkpoints > and savepoints are likely to diverge with respect to guarantees they give for > updatability, etc. > This means that the difference between persistent checkpoints and savepoints > in the long term will be that persistent checkpoints can only be restored > with the same job settings (like parallelism, etc.) > Regular and persisted checkpoints should behave differently with respect to > disposal in *globally* terminal job states (FINISHED, CANCELLED, FAILED): > regular checkpoints are cleaned up in all of these cases whereas persistent > checkpoints only on FINISHED. Maybe with the option to customize behaviour on > CANCELLED or FAILED. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4283) ExecutionGraphRestartTest fails
[ https://issues.apache.org/jira/browse/FLINK-4283?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske updated FLINK-4283: - Assignee: Alexander Shoshin > ExecutionGraphRestartTest fails > --- > > Key: FLINK-4283 > URL: https://issues.apache.org/jira/browse/FLINK-4283 > Project: Flink > Issue Type: Bug >Affects Versions: 1.1.0 > Environment: Ubuntu 14.04 > W10 >Reporter: Chesnay Schepler >Assignee: Alexander Shoshin > Labels: test-stability > > I encounter reliable failures for the following tests: > testRestartAutomatically(org.apache.flink.runtime.executiongraph.ExecutionGraphRestartTest) > Time elapsed: 120.089 sec <<< FAILURE! > java.lang.AssertionError: expected: but was: > at org.junit.Assert.fail(Assert.java:88) > at org.junit.Assert.failNotEquals(Assert.java:743) > at org.junit.Assert.assertEquals(Assert.java:118) > at org.junit.Assert.assertEquals(Assert.java:144) > at > org.apache.flink.runtime.executiongraph.ExecutionGraphRestartTest.restartAfterFailure(ExecutionGraphRestartTest.java:680) > at > org.apache.flink.runtime.executiongraph.ExecutionGraphRestartTest.testRestartAutomatically(ExecutionGraphRestartTest.java:155) > taskShouldNotFailWhenFailureRateLimitWasNotExceeded(org.apache.flink.runtime.executiongraph.ExecutionGraphRestartTest) > Time elapsed: 2.055 sec <<< FAILURE! > java.lang.AssertionError: expected: but was: > at org.junit.Assert.fail(Assert.java:88) > at org.junit.Assert.failNotEquals(Assert.java:743) > at org.junit.Assert.assertEquals(Assert.java:118) > at org.junit.Assert.assertEquals(Assert.java:144) > at > org.apache.flink.runtime.executiongraph.ExecutionGraphRestartTest.restartAfterFailure(ExecutionGraphRestartTest.java:680) > at > org.apache.flink.runtime.executiongraph.ExecutionGraphRestartTest.taskShouldNotFailWhenFailureRateLimitWasNotExceeded(ExecutionGraphRestartTest.java:180) > testFailingExecutionAfterRestart(org.apache.flink.runtime.executiongraph.ExecutionGraphRestartTest) > Time elapsed: 120.079 sec <<< FAILURE! > java.lang.AssertionError: expected: but was: > at org.junit.Assert.fail(Assert.java:88) > at org.junit.Assert.failNotEquals(Assert.java:743) > at org.junit.Assert.assertEquals(Assert.java:118) > at org.junit.Assert.assertEquals(Assert.java:144) > at > org.apache.flink.runtime.executiongraph.ExecutionGraphRestartTest.testFailingExecutionAfterRestart(ExecutionGraphRestartTest.java:397) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r82988652 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/windows/Session.java --- @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.table.windows; + +import org.apache.flink.api.table.SessionWindow; + +/** + * Helper class for creating a session window. Session windows are ideal for cases where the + * window boundaries need to adjust to the incoming data. In a session window it is possible to + * have windows that start at individual points in time for each key and that end once there has + * been a certain period of inactivity. + */ +public class Session { --- End diff -- During the development I had the problem that I had to use `Session$.MODULE$.withGap(10)`. See [1]. But now it is not an companion object anymore, so it works. I will change that. [1] http://stackoverflow.com/questions/3282653/how-do-you-call-a-scala-singleton-method-from-java --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2618#discussion_r83005706 --- Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTests.java --- @@ -256,11 +265,11 @@ public int compare(String o1, String o2) { } @Test - public void testFileReadingOperator() throws Exception { + public void testFileReadingOperatorWithEventTime() throws Exception { Set filesCreated = new HashSet<>(); MapexpectedFileContents = new HashMap<>(); - for(int i = 0; i < NO_OF_FILES; i++) { - Tuple2 file = fillWithData(hdfsURI, "file", i, "This is test line."); + for (int i = 0; i < NO_OF_FILES; i++) { + Tuple2 file = createFileAndFillWithData(hdfsURI, "file", i, "This is test line."); --- End diff -- This can probably be moved into a `@Before` method. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2618#discussion_r83006923 --- Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTests.java --- @@ -336,237 +348,294 @@ public int compare(String o1, String o2) { Assert.assertEquals(expectedFileContents.get(fileIdx), cntntStr.toString()); } - for(org.apache.hadoop.fs.Path file: filesCreated) { + for (org.apache.hadoop.fs.Path file: filesCreated) { hdfs.delete(file, false); } } - private static class PathFilter extends FilePathFilter { - - @Override - public boolean filterPath(Path filePath) { - return filePath.getName().startsWith("**"); - } - } + Monitoring Function Tests // @Test public void testFilePathFiltering() throws Exception { - Set uniqFilesFound = new HashSet<>(); Set filesCreated = new HashSet<>(); + Set filesKept = new TreeSet<>(); // create the files to be discarded for (int i = 0; i < NO_OF_FILES; i++) { - Tuple2file = fillWithData(hdfsURI, "**file", i, "This is test line."); + Tuple2 file = createFileAndFillWithData(hdfsURI, "**file", i, "This is test line."); filesCreated.add(file.f0); } // create the files to be kept for (int i = 0; i < NO_OF_FILES; i++) { - Tuple2 file = fillWithData(hdfsURI, "file", i, "This is test line."); + Tuple2 file = + createFileAndFillWithData(hdfsURI, "file", i, "This is test line."); filesCreated.add(file.f0); + filesKept.add(file.f0.getName()); } TextInputFormat format = new TextInputFormat(new Path(hdfsURI)); format.setFilesFilter(new PathFilter()); + ContinuousFileMonitoringFunction monitoringFunction = new ContinuousFileMonitoringFunction<>(format, hdfsURI, FileProcessingMode.PROCESS_ONCE, 1, INTERVAL); + final FileVerifyingSourceContext context = + new FileVerifyingSourceContext(new OneShotLatch(), monitoringFunction, 0, -1); + monitoringFunction.open(new Configuration()); - monitoringFunction.run(new TestingSourceContext(monitoringFunction, uniqFilesFound)); + monitoringFunction.run(context); - Assert.assertEquals(NO_OF_FILES, uniqFilesFound.size()); - for(int i = 0; i < NO_OF_FILES; i++) { - org.apache.hadoop.fs.Path file = new org.apache.hadoop.fs.Path(hdfsURI + "/file" + i); - Assert.assertTrue(uniqFilesFound.contains(file.toString())); - } + Assert.assertArrayEquals(filesKept.toArray(), context.getSeenFiles().toArray()); - for(org.apache.hadoop.fs.Path file: filesCreated) { + // finally delete the files created for the test. + for (org.apache.hadoop.fs.Path file: filesCreated) { hdfs.delete(file, false); --- End diff -- This could be moved into a `@After` method. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2618#discussion_r83022754 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java --- @@ -257,74 +190,158 @@ public void open(Configuration parameters) throws Exception { long failurePosMax = (long) (0.7 * LINES_PER_FILE); elementsToFailure = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin; - - if (elementCounter >= NO_OF_FILES * LINES_PER_FILE) { - finalCollectedContent = new HashMap<>(); - for (Map.Entryresult: collectedContent.entrySet()) { - finalCollectedContent.put(result.getKey(), new ArrayList<>(result.getValue())); - } - throw new SuccessException(); - } - } - - @Override - public void close() { - try { - super.close(); - } catch (Exception e) { - e.printStackTrace(); - } } @Override public void invoke(String value) throws Exception { - int fileIdx = Character.getNumericValue(value.charAt(0)); + int fileIdx = getFileIdx(value); - Set content = collectedContent.get(fileIdx); + Set content = actualContent.get(fileIdx); if (content == null) { content = new HashSet<>(); - collectedContent.put(fileIdx, content); + actualContent.put(fileIdx, content); } + // detect duplicate lines. if (!content.add(value + "\n")) { fail("Duplicate line: " + value); System.exit(0); } - elementCounter++; + + // this is termination if (elementCounter >= NO_OF_FILES * LINES_PER_FILE) { - finalCollectedContent = new HashMap<>(); - for (Map.Entry result: collectedContent.entrySet()) { - finalCollectedContent.put(result.getKey(), new ArrayList<>(result.getValue())); - } + actualCollectedContent = actualContent; throw new SuccessException(); } - count++; - if (!hasFailed) { + // add some latency so that we have at least one checkpoint in + if (!hasFailed && successfulCheckpoints == 0) { Thread.sleep(2); --- End diff -- The waiting time seems arbitrary. Is it required for the next step (see below)? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4604) Add support for standard deviation/variance
[ https://issues.apache.org/jira/browse/FLINK-4604?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15569021#comment-15569021 ] Timo Walther commented on FLINK-4604: - You cannot use {{!AggregateReduceFunctionsRule.INSTANCE.matches(call)}} as it is always false for sums. See {{AggregateReduceFunctionsRule.containsAvgStddevVarCall}}. What do you mean with "something went wrong"? What about this: {code} val supported = agg.getAggCallList.map(_.getAggregation.getKind).forall { case SqlKind.SUM => true case SqlKind.MIN => true case SqlKind.MAX => true case _ => false } !distinctAggs && !groupSets && !agg.indicator && supported {code} > Add support for standard deviation/variance > --- > > Key: FLINK-4604 > URL: https://issues.apache.org/jira/browse/FLINK-4604 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Anton Mushin > Attachments: 1.jpg > > > Calcite's {{AggregateReduceFunctionsRule}} can convert SQL {{AVG, STDDEV_POP, > STDDEV_SAMP, VAR_POP, VAR_SAMP}} to sum/count functions. We should add, test > and document this rule. > If we also want to add this aggregates to Table API is up for discussion. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2608: [FLINK-4512] [FLIP-10] Add option to persist periodic che...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2608 +1 to the general approach and the code Some suggestions for name polishing: - How about renaming `DISCARD_ON_CANCELLATION` to `DELETE_ON_CANCELLATION`? That would sound more explicit like "cleanup" and actual file deletion. - Since all checkpoints are persistent (at least in HA), how about calling this `enableExternalizedCheckpoints()` rather than `enablePersistentCheckpoints()`? - I would suggest to drop the method `enablePersistentCheckpoints()` without a cleanup policy parameter. Whoever enables that feature should explicitly think about what cleanup policy they want. For the future, can we get rid of the extra storage location for the externalized checkpoint metadata? Simply store it as well in the checkpoint directory? That makes it simpler for users to track and clean up checkpoints manually, if they want to retain externalized checkpoints across cancellations and terminal failures. - Both the config value and the location parameter to the `enablePersistentCheckpoints()` would be dropped. - That would imply that every state backend needs to be able to provide a storage location for the checkpoint metadata - The memory state backend would hence not work with externalized checkpoints, unless one sets explicitly a parameter `setExternalizedCheckpointsLocation(uri)`.` Since this is a bigger change, I would suggest a followup pull request for that. The only change I would make to this pull request (to make transition to the followup smoother) is to remove the path parameter from the `enablePersistentCheckpoints()` methods and always use the configuration value (which will be replaced by the state backend's storage location). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r82990268 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/properties.scala --- @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.expressions + +import org.apache.calcite.rex.RexNode +import org.apache.calcite.tools.RelBuilder +import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo +import org.apache.flink.api.table.FlinkRelBuilder.NamedProperty +import org.apache.flink.api.table.validate.{ValidationFailure, ValidationSuccess} + +abstract class Property(child: Expression) extends UnaryExpression { + + override def toString = s"Property($child)" + + override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = +throw new UnsupportedOperationException("Property cannot be transformed to RexNode.") + + override private[flink] def validateInput() = +if (child.isInstanceOf[WindowReference]) { --- End diff -- I just tried to keep the names short. Because the Scala line lengths are pretty strict. Done. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Closed] (FLINK-4806) ResourceManager stop listening JobManager's leader address
[ https://issues.apache.org/jira/browse/FLINK-4806?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kurt Young closed FLINK-4806. - Resolution: Won't Fix > ResourceManager stop listening JobManager's leader address > -- > > Key: FLINK-4806 > URL: https://issues.apache.org/jira/browse/FLINK-4806 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Kurt Young > > Currently in flip-6 branch, when RM receives a registration from JM, it will > verify the leader session id of JM and attach a JobManagerLeaderListener with > it for monitoring the future changes. > Maybe we can simplify it a little bit. We don't monitor the leadership change > of the JM, after the verification passed when JM registered itself, we simply > write down the leader id of the registered the JM for future rpc filtering, > and start heartbeat monitor with JM. > If JM's leadership has been changed, the new JM will register itself, and RM > will verify its leadership when received registration, and RM can decide > whether accept or reject the registration. It's kind of like JM's information > in RM is preempted only by new JM but not by RM itself with leadership change > listener. By doing this, we can simplify the logic inside RM and don't have > to do any error handling with leader listener. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4787) Add REST API call for cancel-with-savepoints
[ https://issues.apache.org/jira/browse/FLINK-4787?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568599#comment-15568599 ] ASF GitHub Bot commented on FLINK-4787: --- GitHub user uce opened a pull request: https://github.com/apache/flink/pull/2626 [FLINK-4787] [runtime-web] Expose cancel-with-savepoint via REST API Follow up to #2609, exposing the cancel-with-savepoint command via the REST API. The relevant commits are the last two ones. The `RequestHandler` now returns a generic `HttpResponse` instead of a `String`. This enables handlers to return custom reponses (different http codes, etc.). Now most handlers extend thee `AbstractJsonRequestHandler` for default JSON responses (which used to be handled by the generic `RequestHandler`). Adds handlers for triggering and monitoring a job cancellation with savepoints. Since this operation can take some time, we do this asynchronously. According to various online resources, the way to go for REST APIs in such cases is to return HTTP 201 accepted with the location of the in-progress operation. You can merge this pull request into a Git repository by running: $ git pull https://github.com/uce/flink 4787-cancel_with_savepoint_rest_api Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2626.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2626 commit 99a9621383e6a223e39e4ec22d60671a205d958d Author: Ufuk CelebiDate: 2016-10-06T14:43:42Z [FLINK-4512] [FLIP-10] Add option to persist periodic checkpoints [FLINK-4509] [FLIP-10] Specify savepoint directory per savepoint [FLINK-4507] [FLIP-10] Deprecate savepoint backend config commit 25ffc04d7e5d7ef9538447ee5162c0d203e96e89 Author: Ufuk Celebi Date: 2016-10-07T09:48:47Z [FLINK-4717] Add CancelJobWithSavepoint - Adds CancelJobWithSavepoint message, which triggers a savepoint before cancelling the respective job. - Adds -s [targetDirectory] option to CLI cancel command: * bin/flink cancel (regular cancelling) * bin/flink cancel -s (cancel with savepoint to default dir) * bin/flink cancek -s (cancel with savepoint to targetDir) commit bc88dba90a263e80691448e20644e5f126551bb6 Author: Ufuk Celebi Date: 2016-10-11T08:08:14Z [FLINK-4787] [runtime-web] Return generic HttpResponse in RequestHandler - Let RequestHandler return a generic HttpResponse instead of a String. This enables handlers to return custom reponses (differnt http codes, etc.) - Introduce AbstractJsonRequestHandler for default JSON responses commit ecbcf46f5a9d874dbdd908d48c7035c1cb338c1a Author: Ufuk Celebi Date: 2016-10-11T08:09:20Z [FLINK-4787] [runtime-web] Add JobCancellationWithSavepointHandlers - Add handlers for triggering and monitoring job cancellation with savepoints. > Add REST API call for cancel-with-savepoints > > > Key: FLINK-4787 > URL: https://issues.apache.org/jira/browse/FLINK-4787 > Project: Flink > Issue Type: Improvement > Components: Webfrontend >Reporter: Ufuk Celebi > > As a follow up to FLINK-4717, expose the cancel-with-savepoint command via > the REST API. > {code} > /jobs/:jobid/cancel-with-savepoint/ > /jobs/:jobid/cancel-with-savepoint/:targetDirectory > {code} > The first command goes to the default savepoint directory, the second one > uses the given target directory. > The calls need to be async, as triggering a savepoint can take some time. For > this, the handlers return a {{201 (Accepted)}} response with the location of > the status, e.g. {{/jobs/:jobid/cancel-with-savepoint/in-progress/:id}}. > The user has to check that location until the final savepoint path is > returned. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4283) ExecutionGraphRestartTest fails
[ https://issues.apache.org/jira/browse/FLINK-4283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568940#comment-15568940 ] Alexander Shoshin commented on FLINK-4283: -- It is still relevant. I have just strated researching it. These tests finishs successfully when run separately but fails when run together with other tests from the same class. I will write when I find a cause. > ExecutionGraphRestartTest fails > --- > > Key: FLINK-4283 > URL: https://issues.apache.org/jira/browse/FLINK-4283 > Project: Flink > Issue Type: Bug >Affects Versions: 1.1.0 > Environment: Ubuntu 14.04 > W10 >Reporter: Chesnay Schepler >Assignee: Alexander Shoshin > Labels: test-stability > > I encounter reliable failures for the following tests: > testRestartAutomatically(org.apache.flink.runtime.executiongraph.ExecutionGraphRestartTest) > Time elapsed: 120.089 sec <<< FAILURE! > java.lang.AssertionError: expected: but was: > at org.junit.Assert.fail(Assert.java:88) > at org.junit.Assert.failNotEquals(Assert.java:743) > at org.junit.Assert.assertEquals(Assert.java:118) > at org.junit.Assert.assertEquals(Assert.java:144) > at > org.apache.flink.runtime.executiongraph.ExecutionGraphRestartTest.restartAfterFailure(ExecutionGraphRestartTest.java:680) > at > org.apache.flink.runtime.executiongraph.ExecutionGraphRestartTest.testRestartAutomatically(ExecutionGraphRestartTest.java:155) > taskShouldNotFailWhenFailureRateLimitWasNotExceeded(org.apache.flink.runtime.executiongraph.ExecutionGraphRestartTest) > Time elapsed: 2.055 sec <<< FAILURE! > java.lang.AssertionError: expected: but was: > at org.junit.Assert.fail(Assert.java:88) > at org.junit.Assert.failNotEquals(Assert.java:743) > at org.junit.Assert.assertEquals(Assert.java:118) > at org.junit.Assert.assertEquals(Assert.java:144) > at > org.apache.flink.runtime.executiongraph.ExecutionGraphRestartTest.restartAfterFailure(ExecutionGraphRestartTest.java:680) > at > org.apache.flink.runtime.executiongraph.ExecutionGraphRestartTest.taskShouldNotFailWhenFailureRateLimitWasNotExceeded(ExecutionGraphRestartTest.java:180) > testFailingExecutionAfterRestart(org.apache.flink.runtime.executiongraph.ExecutionGraphRestartTest) > Time elapsed: 120.079 sec <<< FAILURE! > java.lang.AssertionError: expected: but was: > at org.junit.Assert.fail(Assert.java:88) > at org.junit.Assert.failNotEquals(Assert.java:743) > at org.junit.Assert.assertEquals(Assert.java:118) > at org.junit.Assert.assertEquals(Assert.java:144) > at > org.apache.flink.runtime.executiongraph.ExecutionGraphRestartTest.testFailingExecutionAfterRestart(ExecutionGraphRestartTest.java:397) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4803) Job Cancel can hang forever waiting for OutputFormat.close()
[ https://issues.apache.org/jira/browse/FLINK-4803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15569025#comment-15569025 ] Shannon Carey commented on FLINK-4803: -- Yes, that's right. cancel() blocks on close(), and therefore if close() misbehaves the thread is never interrupted and cancel() blocks forever. In the issue description, I suggested your option #2. I think you'll want #1 no matter what. However, #2 allows for at least one message and/or exception to be logged that tells the user what went wrong (why their job is taking a long time to cancel, or why it did not cancel gracefully). I'm not sure what your DataSink-specific option would look like. Maybe it is similar to my workaround, where I wrapped my HadoopOutputFormat in a subclass that calls super.close() from a separate thread with a timeout? That workaround is ok, but I had to expend a fair amount of effort to figure out what the problem was, and also there was nothing I could do but restart Flink in order to get my job to terminate (not a desirable solution). You'll want Flink to function smoothly regardless of what data sink the user chooses. > Job Cancel can hang forever waiting for OutputFormat.close() > > > Key: FLINK-4803 > URL: https://issues.apache.org/jira/browse/FLINK-4803 > Project: Flink > Issue Type: Bug >Affects Versions: 1.1.1 >Reporter: Shannon Carey > > If the Flink job uses a badly-behaved OutputFormat (in this example, a > HadoopOutputFormat containing a CqlBulkOutputFormat), where the close() > method blocks forever, it is impossible to cancel the Flink job even though > the blocked thread would respond to an interrupt. The stack traces below show > the state of the important threads when a job is canceled and the > OutputFormat is blocking forever inside of close(). > I suggest that `DataSinkTask.cancel()` method be updated to add a timeout on > `this.format.close()`. When the timeout is reached, the Task thread should be > interrupted. > {code} > "Canceler for DataSink > (org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat@13bf17a2) (2/5)" > #6422 daemon prio=5 os_prio=0 tid=0x7fb7e42f nid=0x34f3 waiting for > monitor entry [0x7fb7be079000] >java.lang.Thread.State: BLOCKED (on object monitor) > at > org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormatBase.close(HadoopOutputFormatBase.java:158) > - waiting to lock <0x0006bae5f788> (a java.lang.Object) > at > org.apache.flink.runtime.operators.DataSinkTask.cancel(DataSinkTask.java:268) > at > org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1149) > at java.lang.Thread.run(Thread.java:745) > "DataSink > (org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat@13bf17a2) (2/5)" > #6410 daemon prio=5 os_prio=0 tid=0x7fb7e79a4800 nid=0x2ad8 waiting on > condition [0x7fb7bdf78000] >java.lang.Thread.State: TIMED_WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0006c5ab5e20> (a > java.util.concurrent.SynchronousQueue$TransferStack) > at > java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) > at > java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:460) > at > java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:362) > at > java.util.concurrent.SynchronousQueue.offer(SynchronousQueue.java:895) > at > org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter.put(SSTableSimpleUnsortedWriter.java:194) > at > org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter.sync(SSTableSimpleUnsortedWriter.java:180) > at > org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter.close(SSTableSimpleUnsortedWriter.java:156) > at > org.apache.cassandra.io.sstable.CQLSSTableWriter.close(CQLSSTableWriter.java:275) > at > org.apache.cassandra.hadoop.AbstractBulkRecordWriter.close(AbstractBulkRecordWriter.java:133) > at > org.apache.cassandra.hadoop.AbstractBulkRecordWriter.close(AbstractBulkRecordWriter.java:126) > at > org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormatBase.close(HadoopOutputFormatBase.java:158) > - locked <0x0006bae5f788> (a java.lang.Object) > at > org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:234) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) > at java.lang.Thread.run(Thread.java:745) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4512) Add option for persistent checkpoints
[ https://issues.apache.org/jira/browse/FLINK-4512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15569028#comment-15569028 ] ASF GitHub Bot commented on FLINK-4512: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2608 +1 to the general approach and the code Some suggestions for name polishing: - How about renaming `DISCARD_ON_CANCELLATION` to `DELETE_ON_CANCELLATION`? That would sound more explicit like "cleanup" and actual file deletion. - Since all checkpoints are persistent (at least in HA), how about calling this `enableExternalizedCheckpoints()` rather than `enablePersistentCheckpoints()`? - I would suggest to drop the method `enablePersistentCheckpoints()` without a cleanup policy parameter. Whoever enables that feature should explicitly think about what cleanup policy they want. For the future, can we get rid of the extra storage location for the externalized checkpoint metadata? Simply store it as well in the checkpoint directory? That makes it simpler for users to track and clean up checkpoints manually, if they want to retain externalized checkpoints across cancellations and terminal failures. - Both the config value and the location parameter to the `enablePersistentCheckpoints()` would be dropped. - That would imply that every state backend needs to be able to provide a storage location for the checkpoint metadata - The memory state backend would hence not work with externalized checkpoints, unless one sets explicitly a parameter `setExternalizedCheckpointsLocation(uri)`.` Since this is a bigger change, I would suggest a followup pull request for that. The only change I would make to this pull request (to make transition to the followup smoother) is to remove the path parameter from the `enablePersistentCheckpoints()` methods and always use the configuration value (which will be replaced by the state backend's storage location). > Add option for persistent checkpoints > - > > Key: FLINK-4512 > URL: https://issues.apache.org/jira/browse/FLINK-4512 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > > Allow periodic checkpoints to be persisted by writing out their meta data. > This is what we currently do for savepoints, but in the future checkpoints > and savepoints are likely to diverge with respect to guarantees they give for > updatability, etc. > This means that the difference between persistent checkpoints and savepoints > in the long term will be that persistent checkpoints can only be restored > with the same job settings (like parallelism, etc.) > Regular and persisted checkpoints should behave differently with respect to > disposal in *globally* terminal job states (FINISHED, CANCELLED, FAILED): > regular checkpoints are cleaned up in all of these cases whereas persistent > checkpoints only on FINISHED. Maybe with the option to customize behaviour on > CANCELLED or FAILED. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2627: Kafka 0.10 follow-up fixes
GitHub user rmetzger opened a pull request: https://github.com/apache/flink/pull/2627 Kafka 0.10 follow-up fixes After merging https://github.com/apache/flink/pull/2369, there was some follow-up feedback in the final commit: https://github.com/apache/flink/commit/6731ec1e48d0a0092dd2330adda73bcf37fda8d7#commitcomment-19375265, which I'm addressing with this pull request: - I'm adding a `Kafka010FetcherTest`, which is based on the `Kafka09FetcherTest`. - I'm undoing the changes in `DataGenerators` to avoid starting a full Flink job to produce some data into a topic. - I'm removing some commented-out code that has been part of the Kafka code for a while. You can merge this pull request into a Git repository by running: $ git pull https://github.com/rmetzger/flink kafka_inline_producer Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2627.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2627 commit 744f8ebb66b2a7288942be139cd7a7e6d1170c80 Author: Robert MetzgerDate: 2016-10-11T13:48:32Z [hotfix][kafka] Undo DataGenerators changes (use inline kafka producer again commit ce26248ad0a4672eeb556863a061a478987694e9 Author: Robert Metzger Date: 2016-10-12T12:03:01Z [hotfix][kafka] Backport Kafka09FetcherTest for Kafka010 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-4635) Implement Data Transfer Authentication using shared secret configuration
[ https://issues.apache.org/jira/browse/FLINK-4635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-4635: -- Issue Type: Sub-task (was: Task) Parent: FLINK-3930 > Implement Data Transfer Authentication using shared secret configuration > > > Key: FLINK-4635 > URL: https://issues.apache.org/jira/browse/FLINK-4635 > Project: Flink > Issue Type: Sub-task >Reporter: Vijay Srinivasaraghavan >Assignee: Vijay Srinivasaraghavan > > The data transfer authentication (TM/Netty) requirement was not addressed as > part of FLINK-3930 and this JIRA is created to track the issue. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2618#discussion_r83017556 --- Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTests.java --- @@ -336,237 +348,294 @@ public int compare(String o1, String o2) { Assert.assertEquals(expectedFileContents.get(fileIdx), cntntStr.toString()); } - for(org.apache.hadoop.fs.Path file: filesCreated) { + for (org.apache.hadoop.fs.Path file: filesCreated) { hdfs.delete(file, false); } } - private static class PathFilter extends FilePathFilter { - - @Override - public boolean filterPath(Path filePath) { - return filePath.getName().startsWith("**"); - } - } + Monitoring Function Tests // @Test public void testFilePathFiltering() throws Exception { - Set uniqFilesFound = new HashSet<>(); Set filesCreated = new HashSet<>(); + Set filesKept = new TreeSet<>(); // create the files to be discarded for (int i = 0; i < NO_OF_FILES; i++) { - Tuple2file = fillWithData(hdfsURI, "**file", i, "This is test line."); + Tuple2 file = createFileAndFillWithData(hdfsURI, "**file", i, "This is test line."); filesCreated.add(file.f0); } // create the files to be kept for (int i = 0; i < NO_OF_FILES; i++) { - Tuple2 file = fillWithData(hdfsURI, "file", i, "This is test line."); + Tuple2 file = + createFileAndFillWithData(hdfsURI, "file", i, "This is test line."); filesCreated.add(file.f0); + filesKept.add(file.f0.getName()); } TextInputFormat format = new TextInputFormat(new Path(hdfsURI)); format.setFilesFilter(new PathFilter()); + ContinuousFileMonitoringFunction monitoringFunction = new ContinuousFileMonitoringFunction<>(format, hdfsURI, FileProcessingMode.PROCESS_ONCE, 1, INTERVAL); + final FileVerifyingSourceContext context = + new FileVerifyingSourceContext(new OneShotLatch(), monitoringFunction, 0, -1); + monitoringFunction.open(new Configuration()); - monitoringFunction.run(new TestingSourceContext(monitoringFunction, uniqFilesFound)); + monitoringFunction.run(context); - Assert.assertEquals(NO_OF_FILES, uniqFilesFound.size()); - for(int i = 0; i < NO_OF_FILES; i++) { - org.apache.hadoop.fs.Path file = new org.apache.hadoop.fs.Path(hdfsURI + "/file" + i); - Assert.assertTrue(uniqFilesFound.contains(file.toString())); - } + Assert.assertArrayEquals(filesKept.toArray(), context.getSeenFiles().toArray()); - for(org.apache.hadoop.fs.Path file: filesCreated) { + // finally delete the files created for the test. + for (org.apache.hadoop.fs.Path file: filesCreated) { hdfs.delete(file, false); } } + private static class PathFilter extends FilePathFilter { + @Override + public boolean filterPath(Path filePath) { + return filePath.getName().startsWith("**"); + } + } + @Test - public void testFileSplitMonitoringReprocessWithAppended() throws Exception { - final Set uniqFilesFound = new HashSet<>(); + public void testSortingOnModTime() throws Exception { + final long[] modTimes = new long[NO_OF_FILES]; + final org.apache.hadoop.fs.Path[] filesCreated = new org.apache.hadoop.fs.Path[NO_OF_FILES]; + + // create some files + for (int i = 0; i < NO_OF_FILES; i++) { + Tuple2 file = + createFileAndFillWithData(hdfsURI, "file", i, "This is test line."); + Thread.sleep(10); + + filesCreated[i] = file.f0; + modTimes[i] = hdfs.getFileStatus(file.f0).getModificationTime(); + } + + TextInputFormat format = new TextInputFormat(new Path(hdfsURI)); +
[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2618#discussion_r83014810 --- Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTests.java --- @@ -336,237 +348,294 @@ public int compare(String o1, String o2) { Assert.assertEquals(expectedFileContents.get(fileIdx), cntntStr.toString()); } - for(org.apache.hadoop.fs.Path file: filesCreated) { + for (org.apache.hadoop.fs.Path file: filesCreated) { hdfs.delete(file, false); } } - private static class PathFilter extends FilePathFilter { - - @Override - public boolean filterPath(Path filePath) { - return filePath.getName().startsWith("**"); - } - } + Monitoring Function Tests // @Test public void testFilePathFiltering() throws Exception { - Set uniqFilesFound = new HashSet<>(); Set filesCreated = new HashSet<>(); + Set filesKept = new TreeSet<>(); // create the files to be discarded for (int i = 0; i < NO_OF_FILES; i++) { - Tuple2file = fillWithData(hdfsURI, "**file", i, "This is test line."); + Tuple2 file = createFileAndFillWithData(hdfsURI, "**file", i, "This is test line."); filesCreated.add(file.f0); } // create the files to be kept for (int i = 0; i < NO_OF_FILES; i++) { - Tuple2 file = fillWithData(hdfsURI, "file", i, "This is test line."); + Tuple2 file = + createFileAndFillWithData(hdfsURI, "file", i, "This is test line."); filesCreated.add(file.f0); + filesKept.add(file.f0.getName()); } TextInputFormat format = new TextInputFormat(new Path(hdfsURI)); format.setFilesFilter(new PathFilter()); + ContinuousFileMonitoringFunction monitoringFunction = new ContinuousFileMonitoringFunction<>(format, hdfsURI, FileProcessingMode.PROCESS_ONCE, 1, INTERVAL); + final FileVerifyingSourceContext context = + new FileVerifyingSourceContext(new OneShotLatch(), monitoringFunction, 0, -1); + monitoringFunction.open(new Configuration()); - monitoringFunction.run(new TestingSourceContext(monitoringFunction, uniqFilesFound)); + monitoringFunction.run(context); - Assert.assertEquals(NO_OF_FILES, uniqFilesFound.size()); - for(int i = 0; i < NO_OF_FILES; i++) { - org.apache.hadoop.fs.Path file = new org.apache.hadoop.fs.Path(hdfsURI + "/file" + i); - Assert.assertTrue(uniqFilesFound.contains(file.toString())); - } + Assert.assertArrayEquals(filesKept.toArray(), context.getSeenFiles().toArray()); - for(org.apache.hadoop.fs.Path file: filesCreated) { + // finally delete the files created for the test. + for (org.apache.hadoop.fs.Path file: filesCreated) { hdfs.delete(file, false); } } + private static class PathFilter extends FilePathFilter { + @Override + public boolean filterPath(Path filePath) { + return filePath.getName().startsWith("**"); + } + } + @Test - public void testFileSplitMonitoringReprocessWithAppended() throws Exception { - final Set uniqFilesFound = new HashSet<>(); + public void testSortingOnModTime() throws Exception { + final long[] modTimes = new long[NO_OF_FILES]; + final org.apache.hadoop.fs.Path[] filesCreated = new org.apache.hadoop.fs.Path[NO_OF_FILES]; + + // create some files + for (int i = 0; i < NO_OF_FILES; i++) { + Tuple2 file = + createFileAndFillWithData(hdfsURI, "file", i, "This is test line."); + Thread.sleep(10); + + filesCreated[i] = file.f0; + modTimes[i] = hdfs.getFileStatus(file.f0).getModificationTime(); + } + + TextInputFormat format = new TextInputFormat(new Path(hdfsURI)); +
[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2618#discussion_r83007178 --- Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTests.java --- @@ -336,237 +348,294 @@ public int compare(String o1, String o2) { Assert.assertEquals(expectedFileContents.get(fileIdx), cntntStr.toString()); } - for(org.apache.hadoop.fs.Path file: filesCreated) { + for (org.apache.hadoop.fs.Path file: filesCreated) { hdfs.delete(file, false); } } - private static class PathFilter extends FilePathFilter { - - @Override - public boolean filterPath(Path filePath) { - return filePath.getName().startsWith("**"); - } - } + Monitoring Function Tests // @Test public void testFilePathFiltering() throws Exception { - Set uniqFilesFound = new HashSet<>(); Set filesCreated = new HashSet<>(); + Set filesKept = new TreeSet<>(); // create the files to be discarded for (int i = 0; i < NO_OF_FILES; i++) { - Tuple2file = fillWithData(hdfsURI, "**file", i, "This is test line."); + Tuple2 file = createFileAndFillWithData(hdfsURI, "**file", i, "This is test line."); filesCreated.add(file.f0); } // create the files to be kept for (int i = 0; i < NO_OF_FILES; i++) { - Tuple2 file = fillWithData(hdfsURI, "file", i, "This is test line."); + Tuple2 file = + createFileAndFillWithData(hdfsURI, "file", i, "This is test line."); filesCreated.add(file.f0); + filesKept.add(file.f0.getName()); } TextInputFormat format = new TextInputFormat(new Path(hdfsURI)); format.setFilesFilter(new PathFilter()); + ContinuousFileMonitoringFunction monitoringFunction = new ContinuousFileMonitoringFunction<>(format, hdfsURI, FileProcessingMode.PROCESS_ONCE, 1, INTERVAL); + final FileVerifyingSourceContext context = + new FileVerifyingSourceContext(new OneShotLatch(), monitoringFunction, 0, -1); + monitoringFunction.open(new Configuration()); - monitoringFunction.run(new TestingSourceContext(monitoringFunction, uniqFilesFound)); + monitoringFunction.run(context); - Assert.assertEquals(NO_OF_FILES, uniqFilesFound.size()); - for(int i = 0; i < NO_OF_FILES; i++) { - org.apache.hadoop.fs.Path file = new org.apache.hadoop.fs.Path(hdfsURI + "/file" + i); - Assert.assertTrue(uniqFilesFound.contains(file.toString())); - } + Assert.assertArrayEquals(filesKept.toArray(), context.getSeenFiles().toArray()); - for(org.apache.hadoop.fs.Path file: filesCreated) { + // finally delete the files created for the test. + for (org.apache.hadoop.fs.Path file: filesCreated) { hdfs.delete(file, false); } } + private static class PathFilter extends FilePathFilter { --- End diff -- You moved this class but really you're only using it in one test case and can simply make it an anonymous function. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2618#discussion_r83019658 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java --- @@ -107,10 +113,10 @@ public ContinuousFileMonitoringFunction( @Override @SuppressWarnings("unchecked") public void open(Configuration parameters) throws Exception { - LOG.info("Opening File Monitoring Source."); - super.open(parameters); format.configure(parameters); + + LOG.info("Opening File Monitoring Source for path: " + path + "."); --- End diff -- I think this doesn't have to be logged at INFO level. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2618#discussion_r83016929 --- Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTests.java --- @@ -336,237 +348,294 @@ public int compare(String o1, String o2) { Assert.assertEquals(expectedFileContents.get(fileIdx), cntntStr.toString()); } - for(org.apache.hadoop.fs.Path file: filesCreated) { + for (org.apache.hadoop.fs.Path file: filesCreated) { hdfs.delete(file, false); } } - private static class PathFilter extends FilePathFilter { - - @Override - public boolean filterPath(Path filePath) { - return filePath.getName().startsWith("**"); - } - } + Monitoring Function Tests // @Test public void testFilePathFiltering() throws Exception { - Set uniqFilesFound = new HashSet<>(); Set filesCreated = new HashSet<>(); + Set filesKept = new TreeSet<>(); // create the files to be discarded for (int i = 0; i < NO_OF_FILES; i++) { - Tuple2file = fillWithData(hdfsURI, "**file", i, "This is test line."); + Tuple2 file = createFileAndFillWithData(hdfsURI, "**file", i, "This is test line."); filesCreated.add(file.f0); } // create the files to be kept for (int i = 0; i < NO_OF_FILES; i++) { - Tuple2 file = fillWithData(hdfsURI, "file", i, "This is test line."); + Tuple2 file = + createFileAndFillWithData(hdfsURI, "file", i, "This is test line."); filesCreated.add(file.f0); + filesKept.add(file.f0.getName()); } TextInputFormat format = new TextInputFormat(new Path(hdfsURI)); format.setFilesFilter(new PathFilter()); + ContinuousFileMonitoringFunction monitoringFunction = new ContinuousFileMonitoringFunction<>(format, hdfsURI, FileProcessingMode.PROCESS_ONCE, 1, INTERVAL); + final FileVerifyingSourceContext context = + new FileVerifyingSourceContext(new OneShotLatch(), monitoringFunction, 0, -1); + monitoringFunction.open(new Configuration()); - monitoringFunction.run(new TestingSourceContext(monitoringFunction, uniqFilesFound)); + monitoringFunction.run(context); - Assert.assertEquals(NO_OF_FILES, uniqFilesFound.size()); - for(int i = 0; i < NO_OF_FILES; i++) { - org.apache.hadoop.fs.Path file = new org.apache.hadoop.fs.Path(hdfsURI + "/file" + i); - Assert.assertTrue(uniqFilesFound.contains(file.toString())); - } + Assert.assertArrayEquals(filesKept.toArray(), context.getSeenFiles().toArray()); - for(org.apache.hadoop.fs.Path file: filesCreated) { + // finally delete the files created for the test. + for (org.apache.hadoop.fs.Path file: filesCreated) { hdfs.delete(file, false); } } + private static class PathFilter extends FilePathFilter { + @Override + public boolean filterPath(Path filePath) { + return filePath.getName().startsWith("**"); + } + } + @Test - public void testFileSplitMonitoringReprocessWithAppended() throws Exception { - final Set uniqFilesFound = new HashSet<>(); + public void testSortingOnModTime() throws Exception { + final long[] modTimes = new long[NO_OF_FILES]; + final org.apache.hadoop.fs.Path[] filesCreated = new org.apache.hadoop.fs.Path[NO_OF_FILES]; + + // create some files + for (int i = 0; i < NO_OF_FILES; i++) { + Tuple2 file = + createFileAndFillWithData(hdfsURI, "file", i, "This is test line."); + Thread.sleep(10); + + filesCreated[i] = file.f0; + modTimes[i] = hdfs.getFileStatus(file.f0).getModificationTime(); + } + + TextInputFormat format = new TextInputFormat(new Path(hdfsURI)); +
[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2618#discussion_r83005668 --- Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTests.java --- @@ -111,8 +114,8 @@ public void testFileReadingOperatorWithIngestionTime() throws Exception { Set filesCreated = new HashSet<>(); MapexpectedFileContents = new HashMap<>(); - for(int i = 0; i < NO_OF_FILES; i++) { - Tuple2 file = fillWithData(hdfsURI, "file", i, "This is test line."); + for (int i = 0; i < NO_OF_FILES; i++) { + Tuple2 file = createFileAndFillWithData(hdfsURI, "file", i, "This is test line."); --- End diff -- This can probably be moved into a `@Before` method. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2618#discussion_r83019574 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java --- @@ -295,7 +256,7 @@ public void close() throws Exception { globalModificationTime = Long.MAX_VALUE; isRunning = false; } - LOG.info("Closed File Monitoring Source."); + LOG.info("Closing File Monitoring Source for path: " + path + "."); --- End diff -- Should we really lock this at info level? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2618#discussion_r83016682 --- Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTests.java --- @@ -336,237 +348,294 @@ public int compare(String o1, String o2) { Assert.assertEquals(expectedFileContents.get(fileIdx), cntntStr.toString()); } - for(org.apache.hadoop.fs.Path file: filesCreated) { + for (org.apache.hadoop.fs.Path file: filesCreated) { hdfs.delete(file, false); } } - private static class PathFilter extends FilePathFilter { - - @Override - public boolean filterPath(Path filePath) { - return filePath.getName().startsWith("**"); - } - } + Monitoring Function Tests // @Test public void testFilePathFiltering() throws Exception { - Set uniqFilesFound = new HashSet<>(); Set filesCreated = new HashSet<>(); + Set filesKept = new TreeSet<>(); // create the files to be discarded for (int i = 0; i < NO_OF_FILES; i++) { - Tuple2file = fillWithData(hdfsURI, "**file", i, "This is test line."); + Tuple2 file = createFileAndFillWithData(hdfsURI, "**file", i, "This is test line."); filesCreated.add(file.f0); } // create the files to be kept for (int i = 0; i < NO_OF_FILES; i++) { - Tuple2 file = fillWithData(hdfsURI, "file", i, "This is test line."); + Tuple2 file = + createFileAndFillWithData(hdfsURI, "file", i, "This is test line."); filesCreated.add(file.f0); + filesKept.add(file.f0.getName()); } TextInputFormat format = new TextInputFormat(new Path(hdfsURI)); format.setFilesFilter(new PathFilter()); + ContinuousFileMonitoringFunction monitoringFunction = new ContinuousFileMonitoringFunction<>(format, hdfsURI, FileProcessingMode.PROCESS_ONCE, 1, INTERVAL); + final FileVerifyingSourceContext context = + new FileVerifyingSourceContext(new OneShotLatch(), monitoringFunction, 0, -1); + monitoringFunction.open(new Configuration()); - monitoringFunction.run(new TestingSourceContext(monitoringFunction, uniqFilesFound)); + monitoringFunction.run(context); - Assert.assertEquals(NO_OF_FILES, uniqFilesFound.size()); - for(int i = 0; i < NO_OF_FILES; i++) { - org.apache.hadoop.fs.Path file = new org.apache.hadoop.fs.Path(hdfsURI + "/file" + i); - Assert.assertTrue(uniqFilesFound.contains(file.toString())); - } + Assert.assertArrayEquals(filesKept.toArray(), context.getSeenFiles().toArray()); - for(org.apache.hadoop.fs.Path file: filesCreated) { + // finally delete the files created for the test. + for (org.apache.hadoop.fs.Path file: filesCreated) { hdfs.delete(file, false); } } + private static class PathFilter extends FilePathFilter { + @Override + public boolean filterPath(Path filePath) { + return filePath.getName().startsWith("**"); + } + } + @Test - public void testFileSplitMonitoringReprocessWithAppended() throws Exception { - final Set uniqFilesFound = new HashSet<>(); + public void testSortingOnModTime() throws Exception { + final long[] modTimes = new long[NO_OF_FILES]; + final org.apache.hadoop.fs.Path[] filesCreated = new org.apache.hadoop.fs.Path[NO_OF_FILES]; + + // create some files + for (int i = 0; i < NO_OF_FILES; i++) { + Tuple2 file = + createFileAndFillWithData(hdfsURI, "file", i, "This is test line."); + Thread.sleep(10); + + filesCreated[i] = file.f0; + modTimes[i] = hdfs.getFileStatus(file.f0).getModificationTime(); + } + + TextInputFormat format = new TextInputFormat(new Path(hdfsURI)); +
[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2618#discussion_r83020529 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java --- @@ -60,26 +63,27 @@ private static final Logger LOG = LoggerFactory.getLogger(ContinuousFileMonitoringFunction.class); /** -* The minimum interval allowed between consecutive path scans. This is applicable if the -* {@code watchType} is set to {@code PROCESS_CONTINUOUSLY}. +* The minimum interval allowed between consecutive path scans. +* NOTE: Only applicable to the {@code PROCESS_CONTINUOUSLY} mode. */ - public static final long MIN_MONITORING_INTERVAL = 100l; + public static final long MIN_MONITORING_INTERVAL = 1l; /** The path to monitor. */ private final String path; - /** The default parallelism for the job, as this is going to be the parallelism of the downstream readers. */ + /** The parallelism of the downstream readers. */ private final int readerParallelism; /** The {@link FileInputFormat} to be read. */ private FileInputFormat format; - /** How often to monitor the state of the directory for new data. */ + /** The interval between consecutive path scans. */ private final long interval; /** Which new data to process (see {@link FileProcessingMode}. */ private final FileProcessingMode watchType; + /** The maximum file modification time seen so far. */ private Long globalModificationTime; --- End diff -- I wonder, should this be `volatile` because it is accessed by the checkpoint thread? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2618#discussion_r83019134 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java --- @@ -146,102 +152,57 @@ public void run(SourceFunction.SourceContext context) throws Exc } } - private void monitorDirAndForwardSplits(FileSystem fs, SourceContext context) throws IOException, JobException { + private void monitorDirAndForwardSplits(FileSystem fs, SourceContext context) throws IOException { assert (Thread.holdsLock(checkpointLock)); - List> splitsByModTime = getInputSplitSortedOnModTime(fs); - - Iterator > it = splitsByModTime.iterator(); - while (it.hasNext()) { - forwardSplits(it.next(), context); - it.remove(); - } - } - - private void forwardSplits(Tuple2 splitsToFwd, SourceContext context) { - assert (Thread.holdsLock(checkpointLock)); - - Long modTime = splitsToFwd.f0; - List splits = splitsToFwd.f1; - - Iterator it = splits.iterator(); - while (it.hasNext()) { - FileInputSplit split = it.next(); - processSplit(split, context); - it.remove(); - } + List eligibleFiles = listEligibleFiles(fs); + Map splitsSortedByModTime = getInputSplitsSortedByModTime(eligibleFiles); - // update the global modification time - if (modTime >= globalModificationTime) { - globalModificationTime = modTime; - } - } - - private void processSplit(FileInputSplit split, SourceContext context) { - LOG.info("Forwarding split: " + split); - context.collect(split); - } - - private List > getInputSplitSortedOnModTime(FileSystem fileSystem) throws IOException { - List eligibleFiles = listEligibleFiles(fileSystem); - if (eligibleFiles.isEmpty()) { - return new ArrayList<>(); - } - - Map splitsToForward = getInputSplits(eligibleFiles); - List > sortedSplitsToForward = new ArrayList<>(); - - for (Map.Entry entry : splitsToForward.entrySet()) { - sortedSplitsToForward.add(new Tuple2<>(entry.getKey(), entry.getValue())); - } - - Collections.sort(sortedSplitsToForward, new Comparator >() { - @Override - public int compare(Tuple2 o1, Tuple2 o2) { - return (int) (o1.f0 - o2.f0); + for (Map.Entry splits: splitsSortedByModTime.entrySet()) { + long modificationTime = splits.getKey(); + for (FileInputSplit split: splits.getValue()) { + LOG.info("Forwarding split: " + split); + context.collect(split); } - }); - - return sortedSplitsToForward; + // update the global modification time + globalModificationTime = Math.max(globalModificationTime, modificationTime); + } } /** -* Creates the input splits for the path to be forwarded to the downstream tasks of the -* {@link ContinuousFileReaderOperator}. Those tasks are going to read their contents for further -* processing. Splits belonging to files in the {@code eligibleFiles} list are the ones -* that are shipped for further processing. +* Creates the input splits to be forwarded to the downstream tasks of the +* {@link ContinuousFileReaderOperator}. Splits are sorted by modification time before +* being forwarded and only splits belonging to files in the {@code eligibleFiles} +* list will be processed. * @param eligibleFiles The files to process. */ - private Map getInputSplits(List eligibleFiles) throws IOException { + private Map getInputSplitsSortedByModTime(List eligibleFiles) throws IOException { + Map splitsByModTime = new TreeMap<>(); if (eligibleFiles.isEmpty()) { - return new HashMap<>(); + return splitsByModTime; } - FileInputSplit[] inputSplits = format.createInputSplits(readerParallelism);
[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2618#discussion_r83004834 --- Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingITCase.java --- @@ -111,174 +115,191 @@ protected void testProgram() throws Exception { * reader. * */ - FileCreator fileCreator = new FileCreator(INTERVAL); - Thread t = new Thread(fileCreator); - t.start(); - TextInputFormat format = new TextInputFormat(new Path(hdfsURI)); format.setFilePath(hdfsURI); - - try { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(4); - - format.setFilesFilter(FilePathFilter.createDefaultFilter()); - ContinuousFileMonitoringFunction monitoringFunction = - new ContinuousFileMonitoringFunction<>(format, hdfsURI, - FileProcessingMode.PROCESS_CONTINUOUSLY, - env.getParallelism(), INTERVAL); - - TypeInformation typeInfo = TypeExtractor.getInputFormatTypes(format); - ContinuousFileReaderOperatorreader = new ContinuousFileReaderOperator<>(format); - TestingSinkFunction sink = new TestingSinkFunction(); - - DataStream splits = env.addSource(monitoringFunction); - splits.transform("FileSplitReader", typeInfo, reader).addSink(sink).setParallelism(1); - env.execute(); - - } catch (Exception e) { - Throwable th = e; - int depth = 0; - - for (; depth < 20; depth++) { - if (th instanceof SuccessException) { - try { - postSubmit(); - } catch (Exception e1) { - e1.printStackTrace(); + format.setFilesFilter(FilePathFilter.createDefaultFilter()); + + // create the stream execution environment with a parallelism > 1 to test + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(PARALLELISM); + + ContinuousFileMonitoringFunction monitoringFunction = + new ContinuousFileMonitoringFunction<>(format, hdfsURI, + FileProcessingMode.PROCESS_CONTINUOUSLY, + env.getParallelism(), INTERVAL); + + // the monitor has always DOP 1 + DataStream splits = env.addSource(monitoringFunction); + Assert.assertEquals(1, splits.getParallelism()); + + ContinuousFileReaderOperator reader = new ContinuousFileReaderOperator<>(format); + TypeInformation typeInfo = TypeExtractor.getInputFormatTypes(format); + + // the readers can be multiple + DataStream content = splits.transform("FileSplitReader", typeInfo, reader); + Assert.assertEquals(PARALLELISM, content.getParallelism()); + + // finally for the sink we set the parallelism to 1 so that we can verify the output + TestingSinkFunction sink = new TestingSinkFunction(); + content.addSink(sink).setParallelism(1); + + Thread job = new Thread() { + + @Override + public void run() { + try { + env.execute("ContinuousFileProcessingITCase Job."); + } catch (Exception e) { + Throwable th = e; + int depth = 0; + + for (; depth < 20; depth++) { --- End diff -- Why not the following: ```java for (int depth = 0; depth < 20; depth++) { ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2618#discussion_r83014794 --- Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTests.java --- @@ -578,18 +647,28 @@ public void emitWatermark(Watermark mark) { @Override public Object getCheckpointLock() { - return lock; + return new Object(); --- End diff -- Why always a new object? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2618#discussion_r83015686 --- Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTests.java --- @@ -578,18 +647,28 @@ public void emitWatermark(Watermark mark) { @Override public Object getCheckpointLock() { - return lock; + return new Object(); } @Override public void close() { } } + / Auxiliary Methods / + + private int getLineNo(String line) { + String[] tkns = line.split("\\s"); + Assert.assertEquals(6, tkns.length); + return Integer.parseInt(tkns[tkns.length - 1]); + } + /** -* Fill the file with content. +* Create a file with pre-determined content. --- End diff -- What is the format of the content? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2618#discussion_r83022623 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java --- @@ -257,74 +190,158 @@ public void open(Configuration parameters) throws Exception { long failurePosMax = (long) (0.7 * LINES_PER_FILE); elementsToFailure = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin; - - if (elementCounter >= NO_OF_FILES * LINES_PER_FILE) { - finalCollectedContent = new HashMap<>(); - for (Map.Entryresult: collectedContent.entrySet()) { - finalCollectedContent.put(result.getKey(), new ArrayList<>(result.getValue())); - } - throw new SuccessException(); - } - } - - @Override - public void close() { - try { - super.close(); - } catch (Exception e) { - e.printStackTrace(); - } } @Override public void invoke(String value) throws Exception { - int fileIdx = Character.getNumericValue(value.charAt(0)); + int fileIdx = getFileIdx(value); - Set content = collectedContent.get(fileIdx); + Set content = actualContent.get(fileIdx); if (content == null) { content = new HashSet<>(); - collectedContent.put(fileIdx, content); + actualContent.put(fileIdx, content); } + // detect duplicate lines. if (!content.add(value + "\n")) { fail("Duplicate line: " + value); System.exit(0); } - elementCounter++; + + // this is termination if (elementCounter >= NO_OF_FILES * LINES_PER_FILE) { - finalCollectedContent = new HashMap<>(); - for (Map.Entry result: collectedContent.entrySet()) { - finalCollectedContent.put(result.getKey(), new ArrayList<>(result.getValue())); - } + actualCollectedContent = actualContent; throw new SuccessException(); } - count++; - if (!hasFailed) { + // add some latency so that we have at least one checkpoint in + if (!hasFailed && successfulCheckpoints == 0) { Thread.sleep(2); - if (numSuccessfulCheckpoints >= 1 && count >= elementsToFailure) { - hasFailed = true; - throw new Exception("Task Failure"); - } + } + + // simulate a node failure + if (!hasFailed && successfulCheckpoints > 0 && elementCounter >= elementsToFailure) { --- End diff -- Is it assured that a checkpoint has been taken place? Or is the exception simply skipped here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2618#discussion_r83005541 --- Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTests.java --- @@ -54,8 +55,10 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentLinkedQueue; -public class ContinuousFileMonitoringTest { +public class ContinuousFileProcessingTests { --- End diff -- By convention, unit tests are named `*Test` and integration tests `*ITCase`. Actually, it affects when they're executed as integration tests will be executed in Maven's `verify` phase. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r82990475 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala --- @@ -27,48 +27,65 @@ import scala.collection.mutable.ListBuffer object RexNodeTranslator { --- End diff -- Good point. I renamed it to `ProjectionTranslator`. Done. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4691) Add group-windows for streaming tables
[ https://issues.apache.org/jira/browse/FLINK-4691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568534#comment-15568534 ] ASF GitHub Bot commented on FLINK-4691: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r82990475 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala --- @@ -27,48 +27,65 @@ import scala.collection.mutable.ListBuffer object RexNodeTranslator { --- End diff -- Good point. I renamed it to `ProjectionTranslator`. Done. > Add group-windows for streaming tables > --- > > Key: FLINK-4691 > URL: https://issues.apache.org/jira/browse/FLINK-4691 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > > Add Tumble, Slide, Session group-windows for streaming tables as described in > [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations]. > > Implementation of group-windows on streaming tables. This includes > implementing the API of group-windows, the logical validation for > group-windows, and the definition of the “rowtime” and “systemtime” keywords. > Group-windows on batch tables won’t be initially supported and will throw an > exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r82999720 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowIntervalTypeInfo.scala --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.typeutils + +import java.util.Objects + +import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} +import org.apache.flink.api.common.typeutils.base.{LongComparator, LongSerializer} +import org.apache.flink.api.common.typeutils.{TypeComparator, TypeSerializer} +import org.apache.flink.api.table.typeutils.RowIntervalTypeInfo.instantiateComparator +import org.apache.flink.util.Preconditions._ + +/** + * TypeInformation for row intervals. + */ +@SerialVersionUID(-1306179424364925258L) +class RowIntervalTypeInfo[T]( --- End diff -- OK, I understand the issue with the instance checks (although I would think these checks are not correct if they fail in such a cases). However, providing a full implementation for each internal type does not seem right. How about we create a special abstract TypeInfo for types that are not required at execution time and implement all irrelevant methods (arity, serializer, comparator, etc.) with `UnsupportedOperationException`. `RowIntervalTypeInfo` and `TimeIntervalTypeInfo` would extend that and just provide the type. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4691) Add group-windows for streaming tables
[ https://issues.apache.org/jira/browse/FLINK-4691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568653#comment-15568653 ] ASF GitHub Bot commented on FLINK-4691: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r82999720 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowIntervalTypeInfo.scala --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.typeutils + +import java.util.Objects + +import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} +import org.apache.flink.api.common.typeutils.base.{LongComparator, LongSerializer} +import org.apache.flink.api.common.typeutils.{TypeComparator, TypeSerializer} +import org.apache.flink.api.table.typeutils.RowIntervalTypeInfo.instantiateComparator +import org.apache.flink.util.Preconditions._ + +/** + * TypeInformation for row intervals. + */ +@SerialVersionUID(-1306179424364925258L) +class RowIntervalTypeInfo[T]( --- End diff -- OK, I understand the issue with the instance checks (although I would think these checks are not correct if they fail in such a cases). However, providing a full implementation for each internal type does not seem right. How about we create a special abstract TypeInfo for types that are not required at execution time and implement all irrelevant methods (arity, serializer, comparator, etc.) with `UnsupportedOperationException`. `RowIntervalTypeInfo` and `TimeIntervalTypeInfo` would extend that and just provide the type. > Add group-windows for streaming tables > --- > > Key: FLINK-4691 > URL: https://issues.apache.org/jira/browse/FLINK-4691 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > > Add Tumble, Slide, Session group-windows for streaming tables as described in > [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations]. > > Implementation of group-windows on streaming tables. This includes > implementing the API of group-windows, the logical validation for > group-windows, and the definition of the “rowtime” and “systemtime” keywords. > Group-windows on batch tables won’t be initially supported and will throw an > exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2619: [FLINK-4108] [scala] Consider ResultTypeQueryable for inp...
Github user albertoRamon commented on the issue: https://github.com/apache/flink/pull/2619 Works OK ¡¡ [Capture Output](http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/jdbc-JDBCInputFormat-tp9393p9489.html) Thanks, Alb --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4108) NPE in Row.productArity
[ https://issues.apache.org/jira/browse/FLINK-4108?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568712#comment-15568712 ] ASF GitHub Bot commented on FLINK-4108: --- Github user albertoRamon commented on the issue: https://github.com/apache/flink/pull/2619 Works OK ¡¡ [Capture Output](http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/jdbc-JDBCInputFormat-tp9393p9489.html) Thanks, Alb > NPE in Row.productArity > --- > > Key: FLINK-4108 > URL: https://issues.apache.org/jira/browse/FLINK-4108 > Project: Flink > Issue Type: Bug > Components: Batch Connectors and Input/Output Formats, Type > Serialization System >Affects Versions: 1.1.0 >Reporter: Martin Scholl >Assignee: Timo Walther > > [this is my first issue request here, please apologize if something is > missing] > JDBCInputFormat of flink 1.1-SNAPSHOT fails with an NPE in Row.productArity: > {quote} > java.io.IOException: Couldn't access resultSet > at > org.apache.flink.api.java.io.jdbc.JDBCInputFormat.nextRecord(JDBCInputFormat.java:288) > at > org.apache.flink.api.java.io.jdbc.JDBCInputFormat.nextRecord(JDBCInputFormat.java:98) > at > org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:162) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.NullPointerException > at org.apache.flink.api.table.Row.productArity(Row.scala:28) > at > org.apache.flink.api.java.io.jdbc.JDBCInputFormat.nextRecord(JDBCInputFormat.java:279) > ... 4 more > {quote} > The code reproduce this can be found in this gist: > https://gist.github.com/zeitgeist/b91a60460661618ca4585e082895c616 > The reason for the NPE, I believe, is the way through which Flink creates Row > instances through Kryo. As Row expects the number of fields to allocate as a > parameter, which Kryo does not provide, the ‘fields’ member of Row ends up > being null. As I’m neither a reflection nor a Kryo expert, I rather leave a > true analysis to more knowledgable programmers. > Part of the aforementioned example is a not very elegant workaround though a > custom type and a cast (function {{jdbcNoIssue}} + custom Row type {{MyRow}}) > to serve as a further hint towards my theory. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2439: [FLINK-4450]update storm verion to 1.0.0 in flink-storm a...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2439 Okay, then we should re-add all the dependency exclusions that were removed from the `flink-storm/pom.xml`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4813) Having flink-test-utils as a dependency outside Flink fails the build
[ https://issues.apache.org/jira/browse/FLINK-4813?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568657#comment-15568657 ] Maximilian Michels commented on FLINK-4813: --- +1 That sounds like a good solution. > Having flink-test-utils as a dependency outside Flink fails the build > - > > Key: FLINK-4813 > URL: https://issues.apache.org/jira/browse/FLINK-4813 > Project: Flink > Issue Type: Bug > Components: Build System >Affects Versions: 1.2.0 >Reporter: Robert Metzger > > The {{flink-test-utils}} depend on {{hadoop-minikdc}}, which has a > dependency, which is only resolvable, if the {{maven-bundle-plugin}} is > loaded. > This is the error message > {code} > [ERROR] Failed to execute goal on project quickstart-1.2-tests: Could not > resolve dependencies for project > com.dataartisans:quickstart-1.2-tests:jar:1.0-SNAPSHOT: Failure to find > org.apache.directory.jdbm:apacheds-jdbm1:bundle:2.0.0-M2 in > https://repo.maven.apache.org/maven2 was cached in the local repository, > resolution will not be reattempted until the update interval of central has > elapsed or updates are forced -> [Help 1] > {code} > {{flink-parent}} loads that plugin, so all "internal" dependencies to the > test utils can resolve the plugin. > Right now, users have to use the maven bundle plugin to use our test utils > externally. > By making the hadoop minikdc dependency optional, we can probably resolve the > issues. Then, only users who want to use the security-related tools in the > test utils need to manually add the hadoop minikdc dependency + the plugin. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2608) Arrays.asList(..) does not work with CollectionInputFormat
[ https://issues.apache.org/jira/browse/FLINK-2608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568798#comment-15568798 ] ASF GitHub Bot commented on FLINK-2608: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2623 As far as I read it, Kryo 3.x is not strictly serialization compatible with 2.x, hence the major version number bump. If the interfaces are still stable, then it should be fine to bump the chill dependency version, exclude any kryo dependency, and add our own 2.x kryo dependency. I would prefer that approach. > Arrays.asList(..) does not work with CollectionInputFormat > -- > > Key: FLINK-2608 > URL: https://issues.apache.org/jira/browse/FLINK-2608 > Project: Flink > Issue Type: Bug > Components: Type Serialization System >Affects Versions: 0.9, 0.10.0 >Reporter: Maximilian Michels >Priority: Minor > Fix For: 1.0.0 > > > When using Arrays.asList(..) as input for a CollectionInputFormat, the > serialization/deserialization fails when deploying the task. > See the following program: > {code:java} > public class WordCountExample { > public static void main(String[] args) throws Exception { > final ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > DataSet text = env.fromElements( > "Who's there?", > "I think I hear them. Stand, ho! Who's there?"); > // DOES NOT WORK > List elements = Arrays.asList(0, 0, 0); > // The following works: > //List elements = new ArrayList<>(new int[] {0,0,0}); > DataSet set = env.fromElements(new TestClass(elements)); > DataSet> wordCounts = text > .flatMap(new LineSplitter()) > .withBroadcastSet(set, "set") > .groupBy(0) > .sum(1); > wordCounts.print(); > } > public static class LineSplitter implements FlatMapFunction Tuple2 > { > @Override > public void flatMap(String line, Collector Integer>> out) { > for (String word : line.split(" ")) { > out.collect(new Tuple2 (word, 1)); > } > } > } > public static class TestClass implements Serializable { > private static final long serialVersionUID = -2932037991574118651L; > List integerList; > public TestClass(List integerList){ > this.integerList=integerList; > } > } > {code} > {noformat} > Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task > 'DataSource (at main(Test.java:32) > (org.apache.flink.api.java.io.CollectionInputFormat))': Deserializing the > InputFormat ([mytests.Test$TestClass@4d6025c5]) failed: unread block data > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:523) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:507) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:507) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at > org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) >
[jira] [Commented] (FLINK-4691) Add group-windows for streaming tables
[ https://issues.apache.org/jira/browse/FLINK-4691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568544#comment-15568544 ] ASF GitHub Bot commented on FLINK-4691: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r82991170 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala --- @@ -23,6 +23,8 @@ import org.apache.flink.api.table.Row import java.math.BigDecimal import java.math.BigInteger +import org.apache.flink.streaming.api.windowing.windows.Window --- End diff -- Done. > Add group-windows for streaming tables > --- > > Key: FLINK-4691 > URL: https://issues.apache.org/jira/browse/FLINK-4691 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > > Add Tumble, Slide, Session group-windows for streaming tables as described in > [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations]. > > Implementation of group-windows on streaming tables. This includes > implementing the API of group-windows, the logical validation for > group-windows, and the definition of the “rowtime” and “systemtime” keywords. > Group-windows on batch tables won’t be initially supported and will throw an > exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r82993938 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/PropertyRead.scala --- @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.runtime.aggregate + +import org.apache.flink.streaming.api.windowing.windows.Window + +/** + * Base class for reading a window property. The property will be extracted once and + * can be read multiple times. + */ +trait PropertyRead[T] extends Serializable { + + def extract(window: Window): Unit --- End diff -- In general I agree to your solution, but right now we support to have the same operation multiple times in a query. E.g. ``` .window(Session withGap 3.milli on 'rowtime as 'w) .select('string, 'w.end, 'w.end) ``` Your code would fail. Let's leave the aggregation as it is for now and rework it again later. The aggregations have to be reworked anyway for efficiency. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4450) update storm version to 1.0.0
[ https://issues.apache.org/jira/browse/FLINK-4450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568782#comment-15568782 ] ASF GitHub Bot commented on FLINK-4450: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2439 Okay, then we should re-add all the dependency exclusions that were removed from the `flink-storm/pom.xml`. > update storm version to 1.0.0 > - > > Key: FLINK-4450 > URL: https://issues.apache.org/jira/browse/FLINK-4450 > Project: Flink > Issue Type: Improvement > Components: flink-contrib >Reporter: yuzhongliu > Fix For: 2.0.0 > > > The storm package path was changed in new version > storm old version package: > backtype.storm.* > storm new version pachage: > org.apache.storm.* > shall we update flink/flink-storm code to new storm version? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2623: [FLINK-2608] Updated Twitter Chill version.
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2623 As far as I read it, Kryo 3.x is not strictly serialization compatible with 2.x, hence the major version number bump. If the interfaces are still stable, then it should be fine to bump the chill dependency version, exclude any kryo dependency, and add our own 2.x kryo dependency. I would prefer that approach. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4691) Add group-windows for streaming tables
[ https://issues.apache.org/jira/browse/FLINK-4691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568532#comment-15568532 ] ASF GitHub Bot commented on FLINK-4691: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r82990268 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/properties.scala --- @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.expressions + +import org.apache.calcite.rex.RexNode +import org.apache.calcite.tools.RelBuilder +import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo +import org.apache.flink.api.table.FlinkRelBuilder.NamedProperty +import org.apache.flink.api.table.validate.{ValidationFailure, ValidationSuccess} + +abstract class Property(child: Expression) extends UnaryExpression { + + override def toString = s"Property($child)" + + override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = +throw new UnsupportedOperationException("Property cannot be transformed to RexNode.") + + override private[flink] def validateInput() = +if (child.isInstanceOf[WindowReference]) { --- End diff -- I just tried to keep the names short. Because the Scala line lengths are pretty strict. Done. > Add group-windows for streaming tables > --- > > Key: FLINK-4691 > URL: https://issues.apache.org/jira/browse/FLINK-4691 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > > Add Tumble, Slide, Session group-windows for streaming tables as described in > [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations]. > > Implementation of group-windows on streaming tables. This includes > implementing the API of group-windows, the logical validation for > group-windows, and the definition of the “rowtime” and “systemtime” keywords. > Group-windows on batch tables won’t be initially supported and will throw an > exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3398) Flink Kafka consumer should support auto-commit opt-outs
[ https://issues.apache.org/jira/browse/FLINK-3398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568564#comment-15568564 ] Robert Metzger commented on FLINK-3398: --- Maybe we should close this JIRA and have the discussion only in FLINK-4280. > Flink Kafka consumer should support auto-commit opt-outs > > > Key: FLINK-3398 > URL: https://issues.apache.org/jira/browse/FLINK-3398 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Shikhar Bhushan >Assignee: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.2.0 > > > Currently the Kafka source will commit consumer offsets to Zookeeper, either > upon a checkpoint if checkpointing is enabled, otherwise periodically based > on {{auto.commit.interval.ms}} > It should be possible to opt-out of committing consumer offsets to Zookeeper. > Kafka has this config as {{auto.commit.enable}} (0.8) and > {{enable.auto.commit}} (0.9). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4604) Add support for standard deviation/variance
[ https://issues.apache.org/jira/browse/FLINK-4604?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568664#comment-15568664 ] Anton Mushin commented on FLINK-4604: - I tried check function in {{org.apache.flink.api.table.plan.rules.dataSet.DataSetAggregateRule#matches}}, but something went wrong :) I did so {code:title=org.apache.flink.api.table.plan.rules.dataSet.DataSetAggregateRule} override def matches(call: RelOptRuleCall): Boolean = { val agg: LogicalAggregate = call.rel(0).asInstanceOf[LogicalAggregate] // check if we have distinct aggregates val distinctAggs = agg.getAggCallList.exists(_.isDistinct) if (distinctAggs) { throw new TableException("DISTINCT aggregates are currently not supported.") } // check if we have grouping sets val groupSets = agg.getGroupSets.size() != 1 || agg.getGroupSets.get(0) != agg.getGroupSet if (groupSets || agg.indicator) { throw new TableException("GROUPING SETS are currently not supported.") } (!distinctAggs && !groupSets && !agg.indicator) && !AggregateReduceFunctionsRule.INSTANCE.matches(call) } {code} And I got next plan and exception: {noformat} DataSetCalc(select=[CAST(/(-(CASE(=($f1, 0), null, $f0), /(*(CASE(=($f3, 0), null, $f2), CASE(=($f3, 0), null, $f2)), $f3)), CASE(=($f3, 1), null, -($f3, 1 AS $f0, CAST(/(-(CASE(=($f5, 0), null, $f4), /(*(CASE(=($f7, 0), null, $f6), CASE(=($f7, 0), null, $f6)), $f7)), CASE(=($f7, 1), null, -($f7, 1 AS $f1, CAST(/(-(CASE(=($f9, 0), null, $f8), /(*(CASE(=($f11, 0), null, $f10), CASE(=($f11, 0), null, $f10)), $f11)), CASE(=($f11, 1), null, -($f11, 1 AS $f2, CAST(/(-(CASE(=($f13, 0), null, $f12), /(*(CASE(=($f15, 0), null, $f14), CASE(=($f15, 0), null, $f14)), $f15)), CASE(=($f15, 1), null, -($f15, 1 AS $f3, CAST(/(-(CASE(=($f17, 0), null, $f16), /(*(CASE(=($f19, 0), null, $f18), CASE(=($f19, 0), null, $f18)), $f19)), CASE(=($f19, 1), null, -($f19, 1 AS $f4, CAST(/(-(CASE(=($f21, 0), null, $f20), /(*(CASE(=($f23, 0), null, $f22), CASE(=($f23, 0), null, $f22)), $f23)), CASE(=($f23, 1), null, -($f23, 1 AS $f5]) DataSetAggregate(select=[$SUM0($f6) AS $f0, COUNT($f6) AS $f1, $SUM0(_1) AS $f2, COUNT(_1) AS $f3, $SUM0($f7) AS $f4, COUNT($f7) AS $f5, $SUM0(_2) AS $f6, COUNT(_2) AS $f7, $SUM0($f8) AS $f8, COUNT($f8) AS $f9, $SUM0(_3) AS $f10, COUNT(_3) AS $f11, $SUM0($f9) AS $f12, COUNT($f9) AS $f13, $SUM0(_4) AS $f14, COUNT(_4) AS $f15, $SUM0($f10) AS $f16, COUNT($f10) AS $f17, $SUM0(_5) AS $f18, COUNT(_5) AS $f19, $SUM0($f11) AS $f20, COUNT($f11) AS $f21, $SUM0(_6) AS $f22, COUNT(_6) AS $f23]) DataSetCalc(select=[_1, _2, _3, _4, _5, _6]) DataSetScan(table=[[_DataSetTable_0]]) {noformat} {noformat} org.apache.flink.api.table.TableException: Type NULL is not supported. Null values must have a supported type. at org.apache.flink.api.table.FlinkTypeFactory$.toTypeInfo(FlinkTypeFactory.scala:128) at org.apache.flink.api.table.codegen.CodeGenerator.visitLiteral(CodeGenerator.scala:553) at org.apache.flink.api.table.codegen.CodeGenerator.visitLiteral(CodeGenerator.scala:56) at org.apache.calcite.rex.RexLiteral.accept(RexLiteral.java:658) at org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$14.apply(CodeGenerator.scala:675) at org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$14.apply(CodeGenerator.scala:675) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.flink.api.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:675) at org.apache.flink.api.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:56) at org.apache.calcite.rex.RexCall.accept(RexCall.java:108) at org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$14.apply(CodeGenerator.scala:675) at org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$14.apply(CodeGenerator.scala:675) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at
[jira] [Updated] (FLINK-3037) Make the behavior of the Kafka consumer configurable if the offsets to restore from are not available
[ https://issues.apache.org/jira/browse/FLINK-3037?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-3037: -- Description: Currently, if the {{FlinkKafkaConsumer}} is restoring a checkpoint and the offset is not available anymore in Kafka, its restoring according to {{auto.offset.reset}}. This leads to inconsistent behavior (not exactly-once anymore) because the operators will not receive data in sync with the checkpoint. With this pull request, I would like to make the behavior controllable, using a flag. The simplest approach would be to let the consumer fail in that case. was: Currently, if the {{FlinkKafkaConsumer}} is restoring a checkpoint and the offset is not available anymore in Kafka, its restoring according to {{auto.offset.reset}}. This leads to inconsistent behavior (not exactly-once anymore) because the operators will not receive data in sync with the checkpoint. With this pull request, I would like to make the behavior controllable, using a flag. > Make the behavior of the Kafka consumer configurable if the offsets to > restore from are not available > - > > Key: FLINK-3037 > URL: https://issues.apache.org/jira/browse/FLINK-3037 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Robert Metzger > > Currently, if the {{FlinkKafkaConsumer}} is restoring a checkpoint and the > offset is not available anymore in Kafka, its restoring according to > {{auto.offset.reset}}. > This leads to inconsistent behavior (not exactly-once anymore) because the > operators will not receive data in sync with the checkpoint. > With this pull request, I would like to make the behavior controllable, using > a flag. The simplest approach would be to let the consumer fail in that case. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2509#discussion_r83003348 --- Diff: flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java --- @@ -345,16 +343,19 @@ protected static void validateZooKeeperConfig(Properties props) { } } - private static long getInvalidOffsetBehavior(Properties config) { + /** +* Check for invalid "auto.offset.reset" values. Should be called in constructor for eager checking before submitting +* the job. Note that 'none' is also considered invalid, as we don't want to deliberately throw an exception --- End diff -- Thank you for the pointer. We are discussing this issue here https://issues.apache.org/jira/browse/FLINK-4280 and here https://issues.apache.org/jira/browse/FLINK-3037 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r83008188 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowIntervalTypeInfo.scala --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.typeutils + +import java.util.Objects + +import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} +import org.apache.flink.api.common.typeutils.base.{LongComparator, LongSerializer} +import org.apache.flink.api.common.typeutils.{TypeComparator, TypeSerializer} +import org.apache.flink.api.table.typeutils.RowIntervalTypeInfo.instantiateComparator +import org.apache.flink.util.Preconditions._ + +/** + * TypeInformation for row intervals. + */ +@SerialVersionUID(-1306179424364925258L) +class RowIntervalTypeInfo[T]( --- End diff -- That's a good idea. I will do that. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r83009444 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/PropertyRead.scala --- @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.runtime.aggregate + +import org.apache.flink.streaming.api.windowing.windows.Window + +/** + * Base class for reading a window property. The property will be extracted once and + * can be read multiple times. + */ +trait PropertyRead[T] extends Serializable { + + def extract(window: Window): Unit --- End diff -- Yes, every aggregation should only happen once. We should definitely do that. Btw. we can also get rid of `AvgAggregate` once `AggregateReduceFunctionsRule` is enabled. So there are many open issues with the current aggregate implementation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2608) Arrays.asList(..) does not work with CollectionInputFormat
[ https://issues.apache.org/jira/browse/FLINK-2608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568745#comment-15568745 ] ASF GitHub Bot commented on FLINK-2608: --- Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2623 This change adds Kryo-shaded to our dependency tree: ``` [INFO] | +- com.twitter:chill_2.10:jar:0.8.1:compile [INFO] | | +- com.twitter:chill-java:jar:0.8.1:compile [INFO] | | \- com.esotericsoftware:kryo-shaded:jar:3.0.3:compile [INFO] | | \- com.esotericsoftware:minlog:jar:1.3.0:compile ``` I suspect that maven is not recognizing this because chill seems to depend on `kryo-shaded`. Apparently, `kryo-shaded` has a shaded ASM version included, but it is not relocating the regular Kryo classes. So we'll end up having two Kryo versions in our classpath. So if we want to upgrade Kryo, we need to do it explicitly, to avoid having two Kryo versions in our classpath. Another issue we need to consider is the serialization compatibility. Savepoints in Flink could potentially contain data serialized with Kryo 2.24. If we want to provide savepoint compatibility between Flink 1.1 and 1.2, we need to consider that. According to the Kryo documentation, 2.24 to 3.0.0 is serialization compatible (I hope the same holds true for chill): https://github.com/EsotericSoftware/kryo/blob/master/CHANGES.md#2240---300-2014-10-04 I would like to hear @StephanEwen and @uce's opinion on this. > Arrays.asList(..) does not work with CollectionInputFormat > -- > > Key: FLINK-2608 > URL: https://issues.apache.org/jira/browse/FLINK-2608 > Project: Flink > Issue Type: Bug > Components: Type Serialization System >Affects Versions: 0.9, 0.10.0 >Reporter: Maximilian Michels >Priority: Minor > Fix For: 1.0.0 > > > When using Arrays.asList(..) as input for a CollectionInputFormat, the > serialization/deserialization fails when deploying the task. > See the following program: > {code:java} > public class WordCountExample { > public static void main(String[] args) throws Exception { > final ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > DataSet text = env.fromElements( > "Who's there?", > "I think I hear them. Stand, ho! Who's there?"); > // DOES NOT WORK > List elements = Arrays.asList(0, 0, 0); > // The following works: > //List elements = new ArrayList<>(new int[] {0,0,0}); > DataSet set = env.fromElements(new TestClass(elements)); > DataSet> wordCounts = text > .flatMap(new LineSplitter()) > .withBroadcastSet(set, "set") > .groupBy(0) > .sum(1); > wordCounts.print(); > } > public static class LineSplitter implements FlatMapFunction Tuple2 > { > @Override > public void flatMap(String line, Collector Integer>> out) { > for (String word : line.split(" ")) { > out.collect(new Tuple2 (word, 1)); > } > } > } > public static class TestClass implements Serializable { > private static final long serialVersionUID = -2932037991574118651L; > List integerList; > public TestClass(List integerList){ > this.integerList=integerList; > } > } > {code} > {noformat} > Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task > 'DataSource (at main(Test.java:32) > (org.apache.flink.api.java.io.CollectionInputFormat))': Deserializing the > InputFormat ([mytests.Test$TestClass@4d6025c5]) failed: unread block data > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:523) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:507) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:507) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190) > at >
[jira] [Commented] (FLINK-4691) Add group-windows for streaming tables
[ https://issues.apache.org/jira/browse/FLINK-4691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568768#comment-15568768 ] ASF GitHub Bot commented on FLINK-4691: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r83008188 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowIntervalTypeInfo.scala --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.typeutils + +import java.util.Objects + +import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} +import org.apache.flink.api.common.typeutils.base.{LongComparator, LongSerializer} +import org.apache.flink.api.common.typeutils.{TypeComparator, TypeSerializer} +import org.apache.flink.api.table.typeutils.RowIntervalTypeInfo.instantiateComparator +import org.apache.flink.util.Preconditions._ + +/** + * TypeInformation for row intervals. + */ +@SerialVersionUID(-1306179424364925258L) +class RowIntervalTypeInfo[T]( --- End diff -- That's a good idea. I will do that. > Add group-windows for streaming tables > --- > > Key: FLINK-4691 > URL: https://issues.apache.org/jira/browse/FLINK-4691 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > > Add Tumble, Slide, Session group-windows for streaming tables as described in > [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations]. > > Implementation of group-windows on streaming tables. This includes > implementing the API of group-windows, the logical validation for > group-windows, and the definition of the “rowtime” and “systemtime” keywords. > Group-windows on batch tables won’t be initially supported and will throw an > exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4691) Add group-windows for streaming tables
[ https://issues.apache.org/jira/browse/FLINK-4691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568789#comment-15568789 ] ASF GitHub Bot commented on FLINK-4691: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r83009444 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/PropertyRead.scala --- @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.runtime.aggregate + +import org.apache.flink.streaming.api.windowing.windows.Window + +/** + * Base class for reading a window property. The property will be extracted once and + * can be read multiple times. + */ +trait PropertyRead[T] extends Serializable { + + def extract(window: Window): Unit --- End diff -- Yes, every aggregation should only happen once. We should definitely do that. Btw. we can also get rid of `AvgAggregate` once `AggregateReduceFunctionsRule` is enabled. So there are many open issues with the current aggregate implementation. > Add group-windows for streaming tables > --- > > Key: FLINK-4691 > URL: https://issues.apache.org/jira/browse/FLINK-4691 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > > Add Tumble, Slide, Session group-windows for streaming tables as described in > [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations]. > > Implementation of group-windows on streaming tables. This includes > implementing the API of group-windows, the logical validation for > group-windows, and the definition of the “rowtime” and “systemtime” keywords. > Group-windows on batch tables won’t be initially supported and will throw an > exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r83000595 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/windows/Session.java --- @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.table.windows; + +import org.apache.flink.api.table.SessionWindow; + +/** + * Helper class for creating a session window. Session windows are ideal for cases where the + * window boundaries need to adjust to the incoming data. In a session window it is possible to + * have windows that start at individual points in time for each key and that end once there has + * been a certain period of inactivity. + */ +public class Session { --- End diff -- I see, that makes sense. I think we should we convert the Java classes into Scala classes because they are in the Scala source folder. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4691) Add group-windows for streaming tables
[ https://issues.apache.org/jira/browse/FLINK-4691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568669#comment-15568669 ] ASF GitHub Bot commented on FLINK-4691: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r83000595 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/windows/Session.java --- @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.table.windows; + +import org.apache.flink.api.table.SessionWindow; + +/** + * Helper class for creating a session window. Session windows are ideal for cases where the + * window boundaries need to adjust to the incoming data. In a session window it is possible to + * have windows that start at individual points in time for each key and that end once there has + * been a certain period of inactivity. + */ +public class Session { --- End diff -- I see, that makes sense. I think we should we convert the Java classes into Scala classes because they are in the Scala source folder. > Add group-windows for streaming tables > --- > > Key: FLINK-4691 > URL: https://issues.apache.org/jira/browse/FLINK-4691 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > > Add Tumble, Slide, Session group-windows for streaming tables as described in > [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations]. > > Implementation of group-windows on streaming tables. This includes > implementing the API of group-windows, the logical validation for > group-windows, and the definition of the “rowtime” and “systemtime” keywords. > Group-windows on batch tables won’t be initially supported and will throw an > exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4283) ExecutionGraphRestartTest fails
[ https://issues.apache.org/jira/browse/FLINK-4283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568819#comment-15568819 ] Stephan Ewen commented on FLINK-4283: - Is this issue still relevant, or has this instability been resolved? [~AlexanderShoshin] what is your approach to fix this? > ExecutionGraphRestartTest fails > --- > > Key: FLINK-4283 > URL: https://issues.apache.org/jira/browse/FLINK-4283 > Project: Flink > Issue Type: Bug >Affects Versions: 1.1.0 > Environment: Ubuntu 14.04 > W10 >Reporter: Chesnay Schepler >Assignee: Alexander Shoshin > Labels: test-stability > > I encounter reliable failures for the following tests: > testRestartAutomatically(org.apache.flink.runtime.executiongraph.ExecutionGraphRestartTest) > Time elapsed: 120.089 sec <<< FAILURE! > java.lang.AssertionError: expected: but was: > at org.junit.Assert.fail(Assert.java:88) > at org.junit.Assert.failNotEquals(Assert.java:743) > at org.junit.Assert.assertEquals(Assert.java:118) > at org.junit.Assert.assertEquals(Assert.java:144) > at > org.apache.flink.runtime.executiongraph.ExecutionGraphRestartTest.restartAfterFailure(ExecutionGraphRestartTest.java:680) > at > org.apache.flink.runtime.executiongraph.ExecutionGraphRestartTest.testRestartAutomatically(ExecutionGraphRestartTest.java:155) > taskShouldNotFailWhenFailureRateLimitWasNotExceeded(org.apache.flink.runtime.executiongraph.ExecutionGraphRestartTest) > Time elapsed: 2.055 sec <<< FAILURE! > java.lang.AssertionError: expected: but was: > at org.junit.Assert.fail(Assert.java:88) > at org.junit.Assert.failNotEquals(Assert.java:743) > at org.junit.Assert.assertEquals(Assert.java:118) > at org.junit.Assert.assertEquals(Assert.java:144) > at > org.apache.flink.runtime.executiongraph.ExecutionGraphRestartTest.restartAfterFailure(ExecutionGraphRestartTest.java:680) > at > org.apache.flink.runtime.executiongraph.ExecutionGraphRestartTest.taskShouldNotFailWhenFailureRateLimitWasNotExceeded(ExecutionGraphRestartTest.java:180) > testFailingExecutionAfterRestart(org.apache.flink.runtime.executiongraph.ExecutionGraphRestartTest) > Time elapsed: 120.079 sec <<< FAILURE! > java.lang.AssertionError: expected: but was: > at org.junit.Assert.fail(Assert.java:88) > at org.junit.Assert.failNotEquals(Assert.java:743) > at org.junit.Assert.assertEquals(Assert.java:118) > at org.junit.Assert.assertEquals(Assert.java:144) > at > org.apache.flink.runtime.executiongraph.ExecutionGraphRestartTest.testFailingExecutionAfterRestart(ExecutionGraphRestartTest.java:397) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4806) ResourceManager stop listening JobManager's leader address
[ https://issues.apache.org/jira/browse/FLINK-4806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568527#comment-15568527 ] Kurt Young commented on FLINK-4806: --- Yes, you are right about this. [~mxm] > ResourceManager stop listening JobManager's leader address > -- > > Key: FLINK-4806 > URL: https://issues.apache.org/jira/browse/FLINK-4806 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Kurt Young > > Currently in flip-6 branch, when RM receives a registration from JM, it will > verify the leader session id of JM and attach a JobManagerLeaderListener with > it for monitoring the future changes. > Maybe we can simplify it a little bit. We don't monitor the leadership change > of the JM, after the verification passed when JM registered itself, we simply > write down the leader id of the registered the JM for future rpc filtering, > and start heartbeat monitor with JM. > If JM's leadership has been changed, the new JM will register itself, and RM > will verify its leadership when received registration, and RM can decide > whether accept or reject the registration. It's kind of like JM's information > in RM is preempted only by new JM but not by RM itself with leadership change > listener. By doing this, we can simplify the logic inside RM and don't have > to do any error handling with leader listener. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r82991170 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala --- @@ -23,6 +23,8 @@ import org.apache.flink.api.table.Row import java.math.BigDecimal import java.math.BigInteger +import org.apache.flink.streaming.api.windowing.windows.Window --- End diff -- Done. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4691) Add group-windows for streaming tables
[ https://issues.apache.org/jira/browse/FLINK-4691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568586#comment-15568586 ] ASF GitHub Bot commented on FLINK-4691: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r82993938 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/PropertyRead.scala --- @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.runtime.aggregate + +import org.apache.flink.streaming.api.windowing.windows.Window + +/** + * Base class for reading a window property. The property will be extracted once and + * can be read multiple times. + */ +trait PropertyRead[T] extends Serializable { + + def extract(window: Window): Unit --- End diff -- In general I agree to your solution, but right now we support to have the same operation multiple times in a query. E.g. ``` .window(Session withGap 3.milli on 'rowtime as 'w) .select('string, 'w.end, 'w.end) ``` Your code would fail. Let's leave the aggregation as it is for now and rework it again later. The aggregations have to be reworked anyway for efficiency. > Add group-windows for streaming tables > --- > > Key: FLINK-4691 > URL: https://issues.apache.org/jira/browse/FLINK-4691 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > > Add Tumble, Slide, Session group-windows for streaming tables as described in > [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations]. > > Implementation of group-windows on streaming tables. This includes > implementing the API of group-windows, the logical validation for > group-windows, and the definition of the “rowtime” and “systemtime” keywords. > Group-windows on batch tables won’t be initially supported and will throw an > exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4813) Having flink-test-utils as a dependency outside Flink fails the build
[ https://issues.apache.org/jira/browse/FLINK-4813?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-4813: -- Description: The {{flink-test-utils}} depend on {{hadoop-minikdc}}, which has a dependency, which is only resolvable, if the {{maven-bundle-plugin}} is loaded. This is the error message {code} [ERROR] Failed to execute goal on project quickstart-1.2-tests: Could not resolve dependencies for project com.dataartisans:quickstart-1.2-tests:jar:1.0-SNAPSHOT: Failure to find org.apache.directory.jdbm:apacheds-jdbm1:bundle:2.0.0-M2 in https://repo.maven.apache.org/maven2 was cached in the local repository, resolution will not be reattempted until the update interval of central has elapsed or updates are forced -> [Help 1] {code} {{flink-parent}} loads that plugin, so all "internal" dependencies to the test utils can resolve the plugin. Right now, users have to use the maven bundle plugin to use our test utils externally. By making the hadoop minikdc dependency optional, we can probably resolve the issues. Then, only users who want to use the security-related tools in the test utils need to manually add the hadoop minikdc dependency + the plugin. was: The {{flink-test-utils}} depend on {{hadoop-minikdc}}, which has a dependency, which is only resolvable, if the {{maven-bundle-plugin}} is loaded. {{flink-parent}} loads that plugin, so all "internal" dependencies to the test utils can resolve the plugin. Right now, users have to use the maven bundle plugin to use our test utils externally. By making the hadoop minikdc dependency optional, we can probably resolve the issues. Then, only users who want to use the security-related tools in the test utils need to manually add the hadoop minikdc dependency + the plugin. > Having flink-test-utils as a dependency outside Flink fails the build > - > > Key: FLINK-4813 > URL: https://issues.apache.org/jira/browse/FLINK-4813 > Project: Flink > Issue Type: Bug > Components: Build System >Affects Versions: 1.2.0 >Reporter: Robert Metzger > > The {{flink-test-utils}} depend on {{hadoop-minikdc}}, which has a > dependency, which is only resolvable, if the {{maven-bundle-plugin}} is > loaded. > This is the error message > {code} > [ERROR] Failed to execute goal on project quickstart-1.2-tests: Could not > resolve dependencies for project > com.dataartisans:quickstart-1.2-tests:jar:1.0-SNAPSHOT: Failure to find > org.apache.directory.jdbm:apacheds-jdbm1:bundle:2.0.0-M2 in > https://repo.maven.apache.org/maven2 was cached in the local repository, > resolution will not be reattempted until the update interval of central has > elapsed or updates are forced -> [Help 1] > {code} > {{flink-parent}} loads that plugin, so all "internal" dependencies to the > test utils can resolve the plugin. > Right now, users have to use the maven bundle plugin to use our test utils > externally. > By making the hadoop minikdc dependency optional, we can probably resolve the > issues. Then, only users who want to use the security-related tools in the > test utils need to manually add the hadoop minikdc dependency + the plugin. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4280) New Flink-specific option to set starting position of Kafka consumer without respecting external offsets in ZK / Broker
[ https://issues.apache.org/jira/browse/FLINK-4280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568701#comment-15568701 ] Robert Metzger commented on FLINK-4280: --- This issue is related: https://issues.apache.org/jira/browse/FLINK-3037 > New Flink-specific option to set starting position of Kafka consumer without > respecting external offsets in ZK / Broker > --- > > Key: FLINK-4280 > URL: https://issues.apache.org/jira/browse/FLINK-4280 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.2.0 > > > Currently, to start reading from the "earliest" and "latest" position in > topics for the Flink Kafka consumer, users set the Kafka config > {{auto.offset.reset}} in the provided properties configuration. > However, the way this config actually works might be a bit misleading if > users were trying to find a way to "read topics from a starting position". > The way the {{auto.offset.reset}} config works in the Flink Kafka consumer > resembles Kafka's original intent for the setting: first, existing external > offsets committed to the ZK / brokers will be checked; if none exists, then > will {{auto.offset.reset}} be respected. > I propose to add Flink-specific ways to define the starting position, without > taking into account the external offsets. The original behaviour (reference > external offsets first) can be changed to be a user option, so that the > behaviour can be retained for frequent Kafka users that may need some > collaboration with existing non-Flink Kafka consumer applications. > How users will interact with the Flink Kafka consumer after this is added, > with a newly introduced {{flink.starting-position}} config: > {code} > Properties props = new Properties(); > props.setProperty("flink.starting-position", "earliest/latest"); > props.setProperty("auto.offset.reset", "..."); // this will be ignored (log a > warning) > props.setProperty("group.id", "...") // this won't have effect on the > starting position anymore (may still be used in external offset committing) > ... > {code} > Or, reference external offsets in ZK / broker: > {code} > Properties props = new Properties(); > props.setProperty("flink.starting-position", "external-offsets"); > props.setProperty("auto.offset.reset", "earliest/latest"); // default will be > latest > props.setProperty("group.id", "..."); // will be used to lookup external > offsets in ZK / broker on startup > ... > {code} > A thing we would need to decide on is what would the default value be for > {{flink.starting-position}}. > Two merits I see in adding this: > 1. This compensates the way users generally interpret "read from a starting > position". As the Flink Kafka connector is somewhat essentially a > "high-level" Kafka consumer for Flink users, I think it is reasonable to add > Flink-specific functionality that users will find useful, although it wasn't > supported in Kafka's original consumer designs. > 2. By adding this, the idea that "the Kafka offset store (ZK / brokers) is > used only to expose progress to the outside world, and not used to manipulate > how Kafka topics are read in Flink (unless users opt to do so)" is even more > definite and solid. There was some discussion in this PR > (https://github.com/apache/flink/pull/1690, FLINK-3398) on this aspect. I > think adding this "decouples" more Flink's internal offset checkpointing from > the external Kafka's offset store. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4280) New Flink-specific option to set starting position of Kafka consumer without respecting external offsets in ZK / Broker
[ https://issues.apache.org/jira/browse/FLINK-4280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568703#comment-15568703 ] ASF GitHub Bot commented on FLINK-4280: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2509#discussion_r83003348 --- Diff: flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java --- @@ -345,16 +343,19 @@ protected static void validateZooKeeperConfig(Properties props) { } } - private static long getInvalidOffsetBehavior(Properties config) { + /** +* Check for invalid "auto.offset.reset" values. Should be called in constructor for eager checking before submitting +* the job. Note that 'none' is also considered invalid, as we don't want to deliberately throw an exception --- End diff -- Thank you for the pointer. We are discussing this issue here https://issues.apache.org/jira/browse/FLINK-4280 and here https://issues.apache.org/jira/browse/FLINK-3037 > New Flink-specific option to set starting position of Kafka consumer without > respecting external offsets in ZK / Broker > --- > > Key: FLINK-4280 > URL: https://issues.apache.org/jira/browse/FLINK-4280 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.2.0 > > > Currently, to start reading from the "earliest" and "latest" position in > topics for the Flink Kafka consumer, users set the Kafka config > {{auto.offset.reset}} in the provided properties configuration. > However, the way this config actually works might be a bit misleading if > users were trying to find a way to "read topics from a starting position". > The way the {{auto.offset.reset}} config works in the Flink Kafka consumer > resembles Kafka's original intent for the setting: first, existing external > offsets committed to the ZK / brokers will be checked; if none exists, then > will {{auto.offset.reset}} be respected. > I propose to add Flink-specific ways to define the starting position, without > taking into account the external offsets. The original behaviour (reference > external offsets first) can be changed to be a user option, so that the > behaviour can be retained for frequent Kafka users that may need some > collaboration with existing non-Flink Kafka consumer applications. > How users will interact with the Flink Kafka consumer after this is added, > with a newly introduced {{flink.starting-position}} config: > {code} > Properties props = new Properties(); > props.setProperty("flink.starting-position", "earliest/latest"); > props.setProperty("auto.offset.reset", "..."); // this will be ignored (log a > warning) > props.setProperty("group.id", "...") // this won't have effect on the > starting position anymore (may still be used in external offset committing) > ... > {code} > Or, reference external offsets in ZK / broker: > {code} > Properties props = new Properties(); > props.setProperty("flink.starting-position", "external-offsets"); > props.setProperty("auto.offset.reset", "earliest/latest"); // default will be > latest > props.setProperty("group.id", "..."); // will be used to lookup external > offsets in ZK / broker on startup > ... > {code} > A thing we would need to decide on is what would the default value be for > {{flink.starting-position}}. > Two merits I see in adding this: > 1. This compensates the way users generally interpret "read from a starting > position". As the Flink Kafka connector is somewhat essentially a > "high-level" Kafka consumer for Flink users, I think it is reasonable to add > Flink-specific functionality that users will find useful, although it wasn't > supported in Kafka's original consumer designs. > 2. By adding this, the idea that "the Kafka offset store (ZK / brokers) is > used only to expose progress to the outside world, and not used to manipulate > how Kafka topics are read in Flink (unless users opt to do so)" is even more > definite and solid. There was some discussion in this PR > (https://github.com/apache/flink/pull/1690, FLINK-3398) on this aspect. I > think adding this "decouples" more Flink's internal offset checkpointing from > the external Kafka's offset store. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2623: [FLINK-2608] Updated Twitter Chill version.
Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2623 This change adds Kryo-shaded to our dependency tree: ``` [INFO] | +- com.twitter:chill_2.10:jar:0.8.1:compile [INFO] | | +- com.twitter:chill-java:jar:0.8.1:compile [INFO] | | \- com.esotericsoftware:kryo-shaded:jar:3.0.3:compile [INFO] | | \- com.esotericsoftware:minlog:jar:1.3.0:compile ``` I suspect that maven is not recognizing this because chill seems to depend on `kryo-shaded`. Apparently, `kryo-shaded` has a shaded ASM version included, but it is not relocating the regular Kryo classes. So we'll end up having two Kryo versions in our classpath. So if we want to upgrade Kryo, we need to do it explicitly, to avoid having two Kryo versions in our classpath. Another issue we need to consider is the serialization compatibility. Savepoints in Flink could potentially contain data serialized with Kryo 2.24. If we want to provide savepoint compatibility between Flink 1.1 and 1.2, we need to consider that. According to the Kryo documentation, 2.24 to 3.0.0 is serialization compatible (I hope the same holds true for chill): https://github.com/EsotericSoftware/kryo/blob/master/CHANGES.md#2240---300-2014-10-04 I would like to hear @StephanEwen and @uce's opinion on this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4813) Having flink-test-utils as a dependency outside Flink fails the build
[ https://issues.apache.org/jira/browse/FLINK-4813?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568803#comment-15568803 ] Stephan Ewen commented on FLINK-4813: - +1 > Having flink-test-utils as a dependency outside Flink fails the build > - > > Key: FLINK-4813 > URL: https://issues.apache.org/jira/browse/FLINK-4813 > Project: Flink > Issue Type: Bug > Components: Build System >Affects Versions: 1.2.0 >Reporter: Robert Metzger > > The {{flink-test-utils}} depend on {{hadoop-minikdc}}, which has a > dependency, which is only resolvable, if the {{maven-bundle-plugin}} is > loaded. > This is the error message > {code} > [ERROR] Failed to execute goal on project quickstart-1.2-tests: Could not > resolve dependencies for project > com.dataartisans:quickstart-1.2-tests:jar:1.0-SNAPSHOT: Failure to find > org.apache.directory.jdbm:apacheds-jdbm1:bundle:2.0.0-M2 in > https://repo.maven.apache.org/maven2 was cached in the local repository, > resolution will not be reattempted until the update interval of central has > elapsed or updates are forced -> [Help 1] > {code} > {{flink-parent}} loads that plugin, so all "internal" dependencies to the > test utils can resolve the plugin. > Right now, users have to use the maven bundle plugin to use our test utils > externally. > By making the hadoop minikdc dependency optional, we can probably resolve the > issues. Then, only users who want to use the security-related tools in the > test utils need to manually add the hadoop minikdc dependency + the plugin. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4717) Naive version of atomic stop signal with savepoint
[ https://issues.apache.org/jira/browse/FLINK-4717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568548#comment-15568548 ] ASF GitHub Bot commented on FLINK-4717: --- Github user uce commented on the issue: https://github.com/apache/flink/pull/2609 Addressed the issue and found another bug in #2608 that I've fixed in f769e8e. If Travis gives the green light, I will rebase on #2608 and merge this later today. > Naive version of atomic stop signal with savepoint > -- > > Key: FLINK-4717 > URL: https://issues.apache.org/jira/browse/FLINK-4717 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Till Rohrmann >Priority: Minor > Fix For: 1.2.0 > > > As a first step towards atomic stopping with savepoints we should implement a > cancel command which prior to cancelling takes a savepoint. Additionally, it > should turn off the periodic checkpointing so that there won't be checkpoints > executed between the savepoint and the cancel command. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2626: [FLINK-4787] [runtime-web] Expose cancel-with-save...
GitHub user uce opened a pull request: https://github.com/apache/flink/pull/2626 [FLINK-4787] [runtime-web] Expose cancel-with-savepoint via REST API Follow up to #2609, exposing the cancel-with-savepoint command via the REST API. The relevant commits are the last two ones. The `RequestHandler` now returns a generic `HttpResponse` instead of a `String`. This enables handlers to return custom reponses (different http codes, etc.). Now most handlers extend thee `AbstractJsonRequestHandler` for default JSON responses (which used to be handled by the generic `RequestHandler`). Adds handlers for triggering and monitoring a job cancellation with savepoints. Since this operation can take some time, we do this asynchronously. According to various online resources, the way to go for REST APIs in such cases is to return HTTP 201 accepted with the location of the in-progress operation. You can merge this pull request into a Git repository by running: $ git pull https://github.com/uce/flink 4787-cancel_with_savepoint_rest_api Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2626.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2626 commit 99a9621383e6a223e39e4ec22d60671a205d958d Author: Ufuk CelebiDate: 2016-10-06T14:43:42Z [FLINK-4512] [FLIP-10] Add option to persist periodic checkpoints [FLINK-4509] [FLIP-10] Specify savepoint directory per savepoint [FLINK-4507] [FLIP-10] Deprecate savepoint backend config commit 25ffc04d7e5d7ef9538447ee5162c0d203e96e89 Author: Ufuk Celebi Date: 2016-10-07T09:48:47Z [FLINK-4717] Add CancelJobWithSavepoint - Adds CancelJobWithSavepoint message, which triggers a savepoint before cancelling the respective job. - Adds -s [targetDirectory] option to CLI cancel command: * bin/flink cancel (regular cancelling) * bin/flink cancel -s (cancel with savepoint to default dir) * bin/flink cancek -s (cancel with savepoint to targetDir) commit bc88dba90a263e80691448e20644e5f126551bb6 Author: Ufuk Celebi Date: 2016-10-11T08:08:14Z [FLINK-4787] [runtime-web] Return generic HttpResponse in RequestHandler - Let RequestHandler return a generic HttpResponse instead of a String. This enables handlers to return custom reponses (differnt http codes, etc.) - Introduce AbstractJsonRequestHandler for default JSON responses commit ecbcf46f5a9d874dbdd908d48c7035c1cb338c1a Author: Ufuk Celebi Date: 2016-10-11T08:09:20Z [FLINK-4787] [runtime-web] Add JobCancellationWithSavepointHandlers - Add handlers for triggering and monitoring job cancellation with savepoints. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3706) YARNSessionCapacitySchedulerITCase.testNonexistingQueue unstable
[ https://issues.apache.org/jira/browse/FLINK-3706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568635#comment-15568635 ] Robert Metzger commented on FLINK-3706: --- Fix for release-1.1: http://git-wip-us.apache.org/repos/asf/flink/commit/c9433bf6 > YARNSessionCapacitySchedulerITCase.testNonexistingQueue unstable > > > Key: FLINK-3706 > URL: https://issues.apache.org/jira/browse/FLINK-3706 > Project: Flink > Issue Type: Bug > Components: YARN Client >Reporter: Aljoscha Krettek >Assignee: Robert Metzger >Priority: Critical > Labels: test-stability > Fix For: 1.2.0 > > Attachments: log.txt > > > I encountered a failed test on travis. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4813) Having flink-test-utils as a dependency outside Flink fails the build
Robert Metzger created FLINK-4813: - Summary: Having flink-test-utils as a dependency outside Flink fails the build Key: FLINK-4813 URL: https://issues.apache.org/jira/browse/FLINK-4813 Project: Flink Issue Type: Bug Components: Build System Affects Versions: 1.2.0 Reporter: Robert Metzger The {{flink-test-utils}} depend on {{hadoop-minikdc}}, which has a dependency, which is only resolvable, if the {{maven-bundle-plugin}} is loaded. {{flink-parent}} loads that plugin, so all "internal" dependencies to the test utils can resolve the plugin. Right now, users have to use the maven bundle plugin to use our test utils externally. By making the hadoop minikdc dependency optional, we can probably resolve the issues. Then, only users who want to use the security-related tools in the test utils need to manually add the hadoop minikdc dependency + the plugin. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2627: Kafka 0.10 follow-up fixes
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2627 Thanks a lot for fixing this. +1 from my side --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4669) scala api createLocalEnvironment() function add default Configuration parameter
[ https://issues.apache.org/jira/browse/FLINK-4669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15567867#comment-15567867 ] ASF GitHub Bot commented on FLINK-4669: --- Github user shijinkui commented on the issue: https://github.com/apache/flink/pull/2541 @StephanEwen I‘ve a new function name `createCustomLocalEnv`. Sorry for late ack. > scala api createLocalEnvironment() function add default Configuration > parameter > --- > > Key: FLINK-4669 > URL: https://issues.apache.org/jira/browse/FLINK-4669 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: shijinkui > > scala program can't direct use createLocalEnvironment with custom Configure > object. > such as I want to start web server in local mode, I will do such as: > ``` > // set up execution environment > val conf = new Configuration > conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true) > conf.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, > ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT) > val env = new org.apache.flink.streaming.api.scala.StreamExecutionEnvironment( > > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.createLocalEnvironment(2, > conf) > ) > ``` > so we need createLocalEnvironment function have a config parameter -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2541: [FLINK-4669] scala api createLocalEnvironment() function ...
Github user shijinkui commented on the issue: https://github.com/apache/flink/pull/2541 @StephanEwen Iâve a new function name `createCustomLocalEnv`. Sorry for late ack. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-4814) Remove extra storage location for externalized checkpoint metadata
Ufuk Celebi created FLINK-4814: -- Summary: Remove extra storage location for externalized checkpoint metadata Key: FLINK-4814 URL: https://issues.apache.org/jira/browse/FLINK-4814 Project: Flink Issue Type: Improvement Reporter: Ufuk Celebi Follow up for FLINK-4512. Store checkpoint meta data in checkpoint directory. That makes it simpler for users to track and clean up checkpoints manually, if they want to retain externalized checkpoints across cancellations and terminal failures. Every state backend needs to be able to provide a storage location for the checkpoint metadata. The memory state backend would hence not work with externalized checkpoints, unless one sets explicitly a parameter `setExternalizedCheckpointsLocation(uri)`. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4816) Executions from "DEPLOYING" should retain restored checkpoint information
Stephan Ewen created FLINK-4816: --- Summary: Executions from "DEPLOYING" should retain restored checkpoint information Key: FLINK-4816 URL: https://issues.apache.org/jira/browse/FLINK-4816 Project: Flink Issue Type: Sub-task Components: Distributed Coordination Reporter: Stephan Ewen When an execution fails from state {{DEPLOYING}}, it should wrap the failure to better report the failure cause: - If no checkpoint was restored, it should wrap the exception in a {{DeployTaskException}} - If a checkpoint was restored, it should wrap the exception in a {{RestoreTaskException}} and record the id of the checkpoint that was attempted to be restored. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-3844) Checkpoint failures should not always lead to job failures
[ https://issues.apache.org/jira/browse/FLINK-3844?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-3844. --- > Checkpoint failures should not always lead to job failures > -- > > Key: FLINK-3844 > URL: https://issues.apache.org/jira/browse/FLINK-3844 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Gyula Fora > > Currently when a checkpoint fails the job crashes immediately. This is not > the desired behaviour in many cases. It would probably be better to log the > failed checkpoint attempt and only fail the job after so many subsequent > failed attempts. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-3844) Checkpoint failures should not always lead to job failures
[ https://issues.apache.org/jira/browse/FLINK-3844?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-3844. - Resolution: Duplicate Duplicate of FLINK-4809 The other issue has already fine grained subtasks and a more detailed description > Checkpoint failures should not always lead to job failures > -- > > Key: FLINK-3844 > URL: https://issues.apache.org/jira/browse/FLINK-3844 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Gyula Fora > > Currently when a checkpoint fails the job crashes immediately. This is not > the desired behaviour in many cases. It would probably be better to log the > failed checkpoint attempt and only fail the job after so many subsequent > failed attempts. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-3594) StreamTask may fail when checkpoint is concurrent to regular termination
[ https://issues.apache.org/jira/browse/FLINK-3594?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-3594. --- > StreamTask may fail when checkpoint is concurrent to regular termination > > > Key: FLINK-3594 > URL: https://issues.apache.org/jira/browse/FLINK-3594 > Project: Flink > Issue Type: Bug >Reporter: Chesnay Schepler >Assignee: Stephan Ewen >Priority: Critical > Labels: test-stability > Fix For: 1.1.0 > > > Some tests in the KafkaConsumerTestBase rely on throwing a SuccessException > to stop the streaming job if the test condition is fulfilled. > The job then fails, and it is checked whether the cause was a > SuccessException. if so, the test is marked as a success, otherwise as a > failure. > However, should this exception be thrown while a checkpoint is being > triggered, the exception that stop the job is not the SuccessException, but a > CancelTaskException. > This should affect every test that uses the SuccessException. > observed here: https://travis-ci.org/apache/flink/jobs/114523189 > The problem is that the exception causes the StreamTask to enter the finally > block inside invoke(), which sets isRunning to false. Within > triggerCheckpoint() isRunning is then checked for being false, and if so a > CancelTaskException is thrown. > This seems like an issue of the runtime; i observed other tests failing, > without giving a good cause since the CancelTaskException masks it. > I was wondering whether triggerCheckpoint() could return false instead of > throwing an exception, and simply assume that an exception will be thrown > within invoke(). -- This message was sent by Atlassian JIRA (v6.3.4#6332)