[jira] [Resolved] (FLINK-11986) Add micro benchmark for state operations
[ https://issues.apache.org/jira/browse/FLINK-11986?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yu Li resolved FLINK-11986. --- Resolution: Implemented Fix Version/s: 1.9.0 PR already merged and integrated into our Jenkins and [SpeedCenter|http://codespeed.dak8s.net:8000/timeline/#/?exe=3&ben=grid&env=2&revs=200&equid=off&quarts=on&extr=on], closing issue. Thanks [~pnowojski] for review and merge the PR, and thanks [~srichter] for review. > Add micro benchmark for state operations > > > Key: FLINK-11986 > URL: https://issues.apache.org/jira/browse/FLINK-11986 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends, Tests >Reporter: Yu Li >Assignee: Yu Li >Priority: Major > Fix For: 1.9.0 > > > Currently in the > [flink-benchmark|https://github.com/dataArtisans/flink-benchmarks] project we > already have a JMH case for the whole backend, but none for finer grained > state operations, and here we propose adding JMH cases for them, including > (but not limited to): > * ValueState > ** testPut > ** testGet > * ListState > ** testUpdate > ** testGet > ** testAddAll > * MapState > ** testPut > ** testGet > ** testContains > ** testKeys > ** testValues > ** testEntries > ** testIterator > ** testRemove > ** testPutAll > And we will create benchmark for {{HeapKeyedStateBackend}} and > {{RocksDBKeyedStateBackend}} separately. > We believe these micro benchmarks could help locate where the regression > comes from in case any observed from the backend benchmark, and we could also > use these benchmarks to assure no performance downgrade when modifying > relative codes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12229) Implement Lazy Scheduling Strategy
[ https://issues.apache.org/jira/browse/FLINK-12229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16830880#comment-16830880 ] BoWang commented on FLINK-12229: Thanks [~till.rohrmann]. The advantage of only looking at the input result partitions is obvious. But I am wondering there may be some negative effects. 1) Once any result partition finishes the consumer vertex will be scheduled and the reset result partition infos would be send to the TM separately. That would be a lot of additional network communications for updating partition info if the input partition number is huge since looking at the IntermediateDataSet all the partition infos are composed in the `TaskDeploymentDescriptor`. 2) There may be resource deadlock, e.g., considering a job with map-reduce-join job vertices. Both reduce and join job vertices has ANY input constraints so parts of the tasks could be scheduled but cannot finishes until all the input result partitions are ready. When the free resource of the cluster are not enough, the running join vertices are waiting for all the input while part of reduce vertices are waiting resource to be schedule. Making SchedulingIntermediateDataSet as part of LazyFromSourcesSchedulingStrategy would work, I will do like this. > Implement Lazy Scheduling Strategy > -- > > Key: FLINK-12229 > URL: https://issues.apache.org/jira/browse/FLINK-12229 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: Gary Yao >Assignee: BoWang >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Implement a {{SchedulingStrategy}} that covers the functionality of > {{ScheduleMode.LAZY_FROM_SOURCES}}, i.e., vertices are scheduled when all the > input data are available. > Acceptance Criteria: > * New strategy is tested in isolation using test implementations (i.e., > without having to submit a job) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12342) Yarn Resource Manager Acquires Too Many Containers
[ https://issues.apache.org/jira/browse/FLINK-12342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16830875#comment-16830875 ] Zhenqiu Huang commented on FLINK-12342: --- [~till.rohrmann] After reading the AMRMAsynclient, I just find the resource request is actually sent in each of heartbeat. If the existing pending request N is not removed yet, the new added request will be added as N +1 and be sent to RM. For the issue we observe, I think it is caused by FAST_YARN_HEARTBEAT_INTERVAL_MS = 500 is set during the resource allocation triggered by SlotManager. Somehow the number of container allocated is always less than pending request within 500 millisecond. So each of fast heartbeat will ask for extra number of containers. If we change FAST_YARN_HEARTBEAT_INTERVAL_MS to 2000 ms, and wait for more containers be returned before sending another heartbeat, we can definitely reduce the total number of requested containers. Thus, the solution I would like to propose is to make the FAST_YARN_HEARTBEAT_INTERVAL_MS as one of YarnConfigOptions. So that the parameter can be tuned according to the size of job/cluster. How do you think? > Yarn Resource Manager Acquires Too Many Containers > -- > > Key: FLINK-12342 > URL: https://issues.apache.org/jira/browse/FLINK-12342 > Project: Flink > Issue Type: Bug > Components: Deployment / YARN >Affects Versions: 1.6.4, 1.7.2, 1.8.0 > Environment: We runs job in Flink release 1.6.3. >Reporter: Zhenqiu Huang >Assignee: Zhenqiu Huang >Priority: Major > Labels: pull-request-available > Attachments: Screen Shot 2019-04-29 at 12.06.23 AM.png, > container.log, flink-1.4.png, flink-1.6.png > > Time Spent: 10m > Remaining Estimate: 0h > > In currently implementation of YarnFlinkResourceManager, it starts to acquire > new container one by one when get request from SlotManager. The mechanism > works when job is still, say less than 32 containers. If the job has 256 > container, containers can't be immediately allocated and appending requests > in AMRMClient will be not removed accordingly. We observe the situation that > AMRMClient ask for current pending request + 1 (the new request from slot > manager) containers. In this way, during the start time of such job, it asked > for 4000+ containers. If there is an external dependency issue happens, for > example hdfs access is slow. Then, the whole job will be blocked without > getting enough resource and finally killed with SlotManager request timeout. > Thus, we should use the total number of container asked rather than pending > request in AMRMClient as threshold to make decision whether we need to add > one more resource request. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12260) Slot allocation failure by taskmanager registration timeout and race
[ https://issues.apache.org/jira/browse/FLINK-12260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16830862#comment-16830862 ] Hwanju Kim commented on FLINK-12260: Thanks for the clarification. I thought you meant it without introducing any additional map, but now it seems clear. I had tried thinking conservative approach as I couldn't 100% rule out the possibility of sender-side race. As we may have a potential simpler solution, I looked at the code again a little further. Initially what led me to any possibility of race is this part: {code:java} val a = PromiseActorRef(ref.provider, timeout, targetName = actorRef, message.getClass.getName, sender) actorRef.tell(message, a) a.result.future {code} This is internalAsk from invokeRpc and PromiseActorRef internally does scheduler.scheduleOnce(timeout.duration) for the timer. The tell of actorRef is sending a message to RM through RemoteActorRef and EndpointManager where the message is passed to Dispatcher, which enqueues the message to mbox and executes mbox via executor thread. My impression was that as tell is asynchronous via executor service, the timer of PromiseActorRef set up before can fire before the message hit the road off the sender. Although that'd be possible, the message at least seems to be enqueued to mbox for RM endpoint and thus the order can be preserved against the next attempt after timeout. So, the ordering seems fine. In addition I was also concerned the case where two different ask calls might happen to use two different TCP connections leading any possible out-of-order delivery. Although not 100% exercising the relevant code, it seems to use a single connection associated by akka endpoints and I checked that's true by packet capture. So, based on the code inspection and no successful repro on sender-side, we can currently conclude that the race is likely happening in task executor connection/handshake on the receiver-side (as repro does). I will test it out with the Till's proposal. On our side, once this fix ends up being applied, we can keep eyes on our test apps, which intermittently hit this issue, to see if there's any other race issue. > Slot allocation failure by taskmanager registration timeout and race > > > Key: FLINK-12260 > URL: https://issues.apache.org/jira/browse/FLINK-12260 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.6.3 >Reporter: Hwanju Kim >Priority: Critical > Attachments: FLINK-12260-repro.diff > > > > In 1.6.2., we have seen slot allocation failure keep happening for long time. > Having looked at the log, I see the following behavior: > # TM sends a registration request R1 to resource manager. > # R1 times out after 100ms, which is initial timeout. > # TM retries a registration request R2 to resource manager (with timeout > 200ms). > # R2 arrives first at resource manager and registered, and then TM gets > successful response moving onto step 5 below. > # On successful registration, R2's instance is put to > taskManagerRegistrations > # Then R1 arrives at resource manager and realizes the same TM resource ID > is already registered, which then unregisters R2's instance ID from > taskManagerRegistrations. A new instance ID for R1 is registered to > workerRegistration. > # R1's response is not handled though since it already timed out (see akka > temp actor resolve failure below), hence no registration to > taskManagerRegistrations. > # TM keeps heartbeating to the resource manager with slot status. > # Resource manager ignores this slot status, since taskManagerRegistrations > contains R2, not R1, which replaced R2 in workerRegistration at step 6. > # Slot request can never be fulfilled, timing out. > The following is the debug logs for the above steps: > > {code:java} > JM log: > 2019-04-11 22:39:40.000,Registering TaskManager with ResourceID > 46c8e0d0fcf2c306f11954a1040d5677 > (akka.ssl.tcp://flink@flink-taskmanager:6122/user/taskmanager_0) at > ResourceManager > 2019-04-11 22:39:40.000,Registering TaskManager > 46c8e0d0fcf2c306f11954a1040d5677 under deade132e2c41c52019cdc27977266cf at > the SlotManager. > 2019-04-11 22:39:40.000,Replacing old registration of TaskExecutor > 46c8e0d0fcf2c306f11954a1040d5677. > 2019-04-11 22:39:40.000,Unregister TaskManager > deade132e2c41c52019cdc27977266cf from the SlotManager. > 2019-04-11 22:39:40.000,Registering TaskManager with ResourceID > 46c8e0d0fcf2c306f11954a1040d5677 > (akka.ssl.tcp://flink@flink-taskmanager:6122/user/taskmanager_0) at > ResourceManager > TM log: > 2019-04-11 22:39:40.000,Registration at ResourceManager attempt 1 > (timeout=100ms) > 2019-04-11 22:39:40.000,Registration at ResourceMan
[GitHub] [flink] xuefuz commented on issue #8314: [FLINK-12365][table] Add stats related catalog APIs
xuefuz commented on issue #8314: [FLINK-12365][table] Add stats related catalog APIs URL: https://github.com/apache/flink/pull/8314#issuecomment-488209732 did a forced push and now it seems good. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz removed a comment on issue #8314: [FLINK-12365][table] Add stats related catalog APIs
xuefuz removed a comment on issue #8314: [FLINK-12365][table] Add stats related catalog APIs URL: https://github.com/apache/flink/pull/8314#issuecomment-488206199 > Hi @xuefuz , can you properly rebase to master? the commit history has some unrelated commits What unrelated commits you saw? I just rebased and rebuilt, and everything seems working fine to me. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on issue #8314: [FLINK-12365][table] Add stats related catalog APIs
xuefuz commented on issue #8314: [FLINK-12365][table] Add stats related catalog APIs URL: https://github.com/apache/flink/pull/8314#issuecomment-488206199 > Hi @xuefuz , can you properly rebase to master? the commit history has some unrelated commits What unrelated commits you saw? I just rebased and rebuilt, and everything seems working fine to me. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on issue #8314: [FLINK-12365][table] Add stats related catalog APIs
bowenli86 commented on issue #8314: [FLINK-12365][table] Add stats related catalog APIs URL: https://github.com/apache/flink/pull/8314#issuecomment-488205589 Hi @xuefuz , can you properly rebase to master? the commit history has some unrelated commits This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] piyushnarang commented on issue #8117: [FLINK-12115] [filesystems]: Add support for AzureFS
piyushnarang commented on issue #8117: [FLINK-12115] [filesystems]: Add support for AzureFS URL: https://github.com/apache/flink/pull/8117#issuecomment-488195295 Seems like the missing ingredient was this commit: https://github.com/apache/flink/pull/8117/commits/8205a210b748110944ac27a3590b995d0c942a42 This was missing when shuai tested and when I first built my 1.6 jars. Unfortunately though, I am running into some shading issues when I try to use it on my cluster so I'll dig into them over the coming days: ``` Caused by: java.lang.NoClassDefFoundError: org/apache/flink/fs/shaded/hadoop3/org/apache/hadoop/hdfs/HdfsConfiguration at org.apache.flink.fs.azurefs.AzureFileSystem.createInitializedAzureFS(AzureFileSystem.java:48) ... ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12375) flink-container job jar does not have read permissions
[ https://issues.apache.org/jira/browse/FLINK-12375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Adam Lamar updated FLINK-12375: --- Description: When building a custom job container using flink-container, the job can't be launched if the provided job jar does not have world-readable permission. This is because the job jar in the container is owned by root:root, but the docker container executes as the flink user. In environments with restrictive umasks (e.g. company laptops) that create files without group and other read permissions by default, this causes the instructions to fail. To reproduce on master: {code:java} cd flink-container/docker cp ../../flink-examples/flink-examples-streaming/target/WordCount.jar . chmod go-r WordCount.jar # still maintain user read permission ./build.sh --job-jar WordCount.jar --from-archive flink-1.8.0-bin-scala_2.11.tgz --image-name flink-job:latest FLINK_DOCKER_IMAGE_NAME=flink-job FLINK_JOB=org.apache.flink.streaming.examples.wordcount.WordCount docker-compose up{code} which results in the following error: {code:java} job-cluster_1 | 2019-04-30 18:40:57,787 ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Could not start cluster entrypoint StandaloneJobClusterEntryPoint. job-cluster_1 | org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to initialize the cluster entrypoint StandaloneJobClusterEntryPoint. job-cluster_1 | at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:190) job-cluster_1 | at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:535) job-cluster_1 | at org.apache.flink.container.entrypoint.StandaloneJobClusterEntryPoint.main(StandaloneJobClusterEntryPoint.java:105) job-cluster_1 | Caused by: org.apache.flink.util.FlinkException: Could not create the DispatcherResourceManagerComponent. job-cluster_1 | at org.apache.flink.runtime.entrypoint.component.AbstractDispatcherResourceManagerComponentFactory.create(AbstractDispatcherResourceManagerComponentFactory.java:257) job-cluster_1 | at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:224) job-cluster_1 | at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:172) job-cluster_1 | at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) job-cluster_1 | at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:171) job-cluster_1 | ... 2 more job-cluster_1 | Caused by: org.apache.flink.util.FlinkException: Could not load the provided entrypoint class. job-cluster_1 | at org.apache.flink.container.entrypoint.ClassPathJobGraphRetriever.createPackagedProgram(ClassPathJobGraphRetriever.java:119) job-cluster_1 | at org.apache.flink.container.entrypoint.ClassPathJobGraphRetriever.retrieveJobGraph(ClassPathJobGraphRetriever.java:96) job-cluster_1 | at org.apache.flink.runtime.dispatcher.JobDispatcherFactory.createDispatcher(JobDispatcherFactory.java:62) job-cluster_1 | at org.apache.flink.runtime.dispatcher.JobDispatcherFactory.createDispatcher(JobDispatcherFactory.java:41) job-cluster_1 | at org.apache.flink.runtime.entrypoint.component.AbstractDispatcherResourceManagerComponentFactory.create(AbstractDispatcherResourceManagerComponentFactory.java:184) job-cluster_1 | ... 6 more job-cluster_1 | Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.examples.wordcount.WordCount job-cluster_1 | at java.net.URLClassLoader.findClass(URLClassLoader.java:382) job-cluster_1 | at java.lang.ClassLoader.loadClass(ClassLoader.java:424) job-cluster_1 | at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) job-cluster_1 | at java.lang.ClassLoader.loadClass(ClassLoader.java:357) job-cluster_1 | at org.apache.flink.container.entrypoint.ClassPathJobGraphRetriever.createPackagedProgram(ClassPathJobGraphRetriever.java:116) job-cluster_1 | ... 10 more{code} This issue can be fixed by chown'ing the job.jar file to flink:flink in the Dockerfile. was: When building a custom job jar using flink-container, the job can't be launched if the provided job jar does not have world-readable permission. This is because the job jar in the container is owned by root:root, but the docker container executes as the flink user. In environments with restrictive umasks (e.g. company laptops) that create files without group and other read permissions by default, this causes the instructions to fail. To reproduce on master: {code:java} cd flink-container/docker cp ../../flink-examples/flink-examples-streaming/target/WordCount.jar . chmod go-r WordCount.jar # still maintain user read permission ./build.sh --job-jar WordCount.jar --from-archive flink-1.8.0-bin-scala_2.11.tgz --image-name flink-job:latest FLINK_DOCKER_IMAGE_NAME=flink-job FLINK_JOB=
[GitHub] [flink] piyushnarang edited a comment on issue #8117: [FLINK-12115] [filesystems]: Add support for AzureFS
piyushnarang edited a comment on issue #8117: [FLINK-12115] [filesystems]: Add support for AzureFS URL: https://github.com/apache/flink/pull/8117#issuecomment-488128359 P.S. @tillrohrmann / @shuai-xu - I was able to test this out on a hadoop cluster. I had to copy the following jars to my lib dir to get it to work - `azure-storage-2.0.0.jar`, `flink-azure-fs-hadoop-1.6-SNAPSHOT.jar`, `hadoop-azure-*.jar` We're on flink 1.6. I notice that in case of a 1.6 build, the flink-azure-fs*SNAPSHOT.jar is ~15K in size. However, when I build off this branch (1.9-SNAPSHOT), my flink-azure-fs*SNAPSHOT.jar ends up ~23M in size. @shuai-xu - do you remember which version you tried and what the size of the flink-azure-fs snapshot jar was in your case? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] stevenzwu edited a comment on issue #8315: [FLINK-12368] add subtask index to FlinkKafkaConsumerBase logging, wh…
stevenzwu edited a comment on issue #8315: [FLINK-12368] add subtask index to FlinkKafkaConsumerBase logging, wh… URL: https://github.com/apache/flink/pull/8315#issuecomment-488149092 @Myasuka that is good to know. This can be an option. but I am not sure if we always want to enable thread in each log line just for this purpose. `FlinkKafkaConsumerBase` already have many existing log lines with "Consumer subtask " prefix. this PR is to fill this useful info in other important lines where it is missing. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] stevenzwu commented on issue #8315: [FLINK-12368] add subtask index to FlinkKafkaConsumerBase logging, wh…
stevenzwu commented on issue #8315: [FLINK-12368] add subtask index to FlinkKafkaConsumerBase logging, wh… URL: https://github.com/apache/flink/pull/8315#issuecomment-488149092 @Myasuka that is good to know.enable thread name in each log line can be an option. but I am not sure if we always want to enable thread in each log line just for this purpose. `FlinkKafkaConsumerBase` already have many existing log lines with "Consumer subtask " prefix. this PR is to fill this useful info in other important lines where it is missing. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12379) Parallelism in job/GCS/Hadoop: Could not finalize the pending checkpoint
[ https://issues.apache.org/jira/browse/FLINK-12379?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Henrik updated FLINK-12379: --- Description: When running one standalone-job w/ parallelism=1 + one taskmanager, you will shortly get this crash {code:java} 2019-04-30 22:20:02,928 WARN org.apache.flink.runtime.jobmaster.JobMaster - Error while processing checkpoint acknowledgement message org.apache.flink.runtime.checkpoint.CheckpointException: Could not finalize the pending checkpoint 5. at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:837) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:756) at org.apache.flink.runtime.jobmaster.JobMaster.lambda$acknowledgeCheckpoint$9(JobMaster.java:676) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: 'gs://example_bucket/flink/checkpoints//chk-5/_metadata' already exists at com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.createChannel(GoogleHadoopOutputStream.java:85) at com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.(GoogleHadoopOutputStream.java:74) at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.create(GoogleHadoopFileSystemBase.java:797) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:807) at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:141) at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:37) at org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.(FsCheckpointMetadataOutputStream.java:65) at org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation.createMetadataOutputStream(FsCheckpointStorageLocation.java:104) at org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:259) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:829) ... 8 more Caused by: java.nio.file.FileAlreadyExistsException: Object gs://example_bucket/flink/checkpoints//chk-5/_metadata already exists. at com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.getWriteGeneration(GoogleCloudStorageImpl.java:1918) at com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.create(GoogleCloudStorageImpl.java:410) at com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.createInternal(GoogleCloudStorageFileSystem.java:286) at com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.create(GoogleCloudStorageFileSystem.java:264) at com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.createChannel(GoogleHadoopOutputStream.java:82) ... 19 more 2019-04-30 22:20:03,114 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 6 @ 1556662802928 for job .{code} My guess at why; concurrent checkpoint writers are updating the _metadata resource concurrently. They should be using optimistic concurrency control with ETag on GCS, and then retry until successful. When running with parallelism=4, you always seem to get this, even after deleting all checkpoints: {code:java} [analytics-job-cluster-668886c96b-mhqmc job] 2019-04-30 22:50:35,175 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint triggering task Source: Custom Source -> Process -> Timestamps/Watermarks -> our_events (1/4) of job is not in state RUNNING but SCHEDULED instead. Aborting checkpoint.{code} Or in short: with parallelism > 1, your Flink job never makes progress. was: When running a standalone-job w/ parallelism=4 + taskmanager, you will shortly get this crash {code:java} 2019-04-30 22:20:02,928 WARN org.apache.flink.runtime.jobmaster.JobMaster - Error while processing checkpoint acknowledgement message org.apache.flink.runtime.checkpoint.CheckpointException: Could not finalize the pending checkpoint 5. at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:837) at o
[GitHub] [flink] xuefuz commented on a change in pull request #8312: [FLINK-12366][table] Clean up Catalog APIs to make them more consiste…
xuefuz commented on a change in pull request #8312: [FLINK-12366][table] Clean up Catalog APIs to make them more consiste… URL: https://github.com/apache/flink/pull/8312#discussion_r279961356 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableCatalog.java ## @@ -174,14 +172,11 @@ * @param partitionSpec partition spec of partition to get * @return the requested partition * -* @throws TableNotExistException thrown if the table does not exist in the catalog -* @throws TableNotPartitionedException thrown if the table is not partitioned -* @throws PartitionSpecInvalidException thrown if the given partition spec is invalid, * @throws PartitionNotExistException thrown if the partition is not partitioned * @throws CatalogException in case of any runtime exception */ CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) - throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, PartitionNotExistException, CatalogException; + throws PartitionNotExistException, CatalogException; Review comment: In short, partition not existing includes the case of table not existing. getPartition() attempts to get a specific partition for a table. If the table doesn't exist, then of course, the partition doesn't exist. On the other hand, listPartition() fails (throws an exception) if the table doesn't exist, which is reasonable. Just like getTable(), which doesn't throw databasenotexistEx when db doesn't exist. The change makes the behavior consistent across board. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-12379) Parallelism in job/GCS/Hadoop: Could not finalize the pending checkpoint
Henrik created FLINK-12379: -- Summary: Parallelism in job/GCS/Hadoop: Could not finalize the pending checkpoint Key: FLINK-12379 URL: https://issues.apache.org/jira/browse/FLINK-12379 Project: Flink Issue Type: Bug Components: FileSystems Affects Versions: 1.8.0 Environment: GCS + {code:java} 1.8.0 1.8 2.11{code} {code:java} com.google.cloud.bigdataoss gcs-connector hadoop2-1.9.16 org.apache.flink flink-connector-filesystem_2.11 ${flink.version} org.apache.flink flink-hadoop-fs ${flink.version} org.apache.flink flink-shaded-hadoop2 ${hadoop.version}-${flink.version} {code} Reporter: Henrik When running a standalone-job w/ parallelism=4 + taskmanager, you will shortly get this crash {code:java} 2019-04-30 22:20:02,928 WARN org.apache.flink.runtime.jobmaster.JobMaster - Error while processing checkpoint acknowledgement message org.apache.flink.runtime.checkpoint.CheckpointException: Could not finalize the pending checkpoint 5. at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:837) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:756) at org.apache.flink.runtime.jobmaster.JobMaster.lambda$acknowledgeCheckpoint$9(JobMaster.java:676) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: 'gs://example_bucket/flink/checkpoints//chk-5/_metadata' already exists at com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.createChannel(GoogleHadoopOutputStream.java:85) at com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.(GoogleHadoopOutputStream.java:74) at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.create(GoogleHadoopFileSystemBase.java:797) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:807) at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:141) at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:37) at org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.(FsCheckpointMetadataOutputStream.java:65) at org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation.createMetadataOutputStream(FsCheckpointStorageLocation.java:104) at org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:259) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:829) ... 8 more Caused by: java.nio.file.FileAlreadyExistsException: Object gs://example_bucket/flink/checkpoints//chk-5/_metadata already exists. at com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.getWriteGeneration(GoogleCloudStorageImpl.java:1918) at com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.create(GoogleCloudStorageImpl.java:410) at com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.createInternal(GoogleCloudStorageFileSystem.java:286) at com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.create(GoogleCloudStorageFileSystem.java:264) at com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.createChannel(GoogleHadoopOutputStream.java:82) ... 19 more 2019-04-30 22:20:03,114 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 6 @ 1556662802928 for job .{code} My guess at why; concurrent checkpoint writers are updating the _metadata resource concurrently. They should be using optimistic concurrency control with ETag on GCS, and then retry until successful. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12376) GCS runtime exn: Request payload size exceeds the limit
[ https://issues.apache.org/jira/browse/FLINK-12376?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Henrik updated FLINK-12376: --- Environment: FROM flink:1.8.0-scala_2.11 ARG version=0.17 ADD https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-latest-hadoop2.jar /opt/flink/lib COPY target/analytics-${version}.jar /opt/flink/lib/analytics.jar was:* k8s latest docker-for-desktop on macOS, and scala 2.11-compiled Flink > GCS runtime exn: Request payload size exceeds the limit > --- > > Key: FLINK-12376 > URL: https://issues.apache.org/jira/browse/FLINK-12376 > Project: Flink > Issue Type: Bug > Components: FileSystems >Affects Versions: 1.7.2 > Environment: FROM flink:1.8.0-scala_2.11 > ARG version=0.17 > ADD > https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-latest-hadoop2.jar > /opt/flink/lib > COPY target/analytics-${version}.jar /opt/flink/lib/analytics.jar >Reporter: Henrik >Priority: Major > Attachments: Screenshot 2019-04-30 at 22.32.34.png > > > I'm trying to use the google cloud storage file system, but it would seem > that the FLINK / GCS client libs are creating too-large requests far down in > the GCS Java client. > The Java client is added to the lib folder with this command in Dockerfile > (probably > [hadoop2-1.9.16|https://search.maven.org/artifact/com.google.cloud.bigdataoss/gcs-connector/hadoop2-1.9.16/jar] > at the time of writing): > > {code:java} > ADD > https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-latest-hadoop2.jar > /opt/flink/lib{code} > This is the crash output. Focus lines: > {code:java} > java.lang.RuntimeException: Error while confirming checkpoint{code} > and > {code:java} > Caused by: com.google.api.gax.rpc.InvalidArgumentException: > io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Request payload size > exceeds the limit: 524288 bytes.{code} > Full stacktrace: > > {code:java} > [analytics-867c867ff6-l622h taskmanager] 2019-04-30 20:23:14,532 INFO > org.apache.flink.runtime.taskmanager.Task - Source: > Custom Source -> Process -> Timestamps/Watermarks -> app_events (1/1) > (9a01e96c0271025d5ba73b735847cd4c) switched from RUNNING to FAILED. > [analytics-867c867ff6-l622h taskmanager] java.lang.RuntimeException: Error > while confirming checkpoint > [analytics-867c867ff6-l622h taskmanager] at > org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1211) > [analytics-867c867ff6-l622h taskmanager] at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > [analytics-867c867ff6-l622h taskmanager] at > java.util.concurrent.FutureTask.run(FutureTask.java:266) > [analytics-867c867ff6-l622h taskmanager] at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > [analytics-867c867ff6-l622h taskmanager] at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > [analytics-867c867ff6-l622h taskmanager] at > java.lang.Thread.run(Thread.java:748) > [analytics-867c867ff6-l622h taskmanager] Caused by: > com.google.api.gax.rpc.InvalidArgumentException: > io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Request payload size > exceeds the limit: 524288 bytes. > [analytics-867c867ff6-l622h taskmanager] at > com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:49) > [analytics-867c867ff6-l622h taskmanager] at > com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:72) > [analytics-867c867ff6-l622h taskmanager] at > com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:60) > [analytics-867c867ff6-l622h taskmanager] at > com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97) > [analytics-867c867ff6-l622h taskmanager] at > com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:68) > [analytics-867c867ff6-l622h taskmanager] at > com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1056) > [analytics-867c867ff6-l622h taskmanager] at > com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:30) > [analytics-867c867ff6-l622h taskmanager] at > com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1138) > [analytics-867c867ff6-l622h taskmanager] at > com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:958) > [analytics-867c867ff6-l622h taskmanager] at > com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:748) > [analytics-867c867ff6-l622h taskmanager] at > io.grpc.stub.ClientCalls$GrpcFuture.setException(ClientCalls.java:507) > [analytics-867c867ff6-l622h taskmanage
[GitHub] [flink] xuefuz commented on issue #8314: [FLINK-12365][table] Add stats related catalog APIs
xuefuz commented on issue #8314: [FLINK-12365][table] Add stats related catalog APIs URL: https://github.com/apache/flink/pull/8314#issuecomment-488133623 I updated the PR with the following reworking: 1. Modified CatalogColumnStatistics class to enclose statistical info of all columns for a table/partition. 2. Added CatalogColumnStatisticsDataBase as a base class for column stats values that can be subclassed for different data types. 3. Added two stats value types, boolean and long, as examples. More types will be added in followup JIRAs. 4. Return "unknown" stats for GenericInMemoryCatalog when stats are not available for a table/partition. 5. Modfied tests and test utils accordingly. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #8326: [FLINK-12378][docs] Consolidate FileSystem Documentation
flinkbot edited a comment on issue #8326: [FLINK-12378][docs] Consolidate FileSystem Documentation URL: https://github.com/apache/flink/pull/8326#issuecomment-488131875 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❗ 3. Needs [attention] from. - Needs attention by @fhueske [PMC] * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-8513) Add documentation for connecting to non-AWS S3 endpoints
[ https://issues.apache.org/jira/browse/FLINK-8513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16830731#comment-16830731 ] Seth Wiesman commented on FLINK-8513: - Included as part of https://issues.apache.org/jira/browse/FLINK-12378 > Add documentation for connecting to non-AWS S3 endpoints > > > Key: FLINK-8513 > URL: https://issues.apache.org/jira/browse/FLINK-8513 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem, Documentation >Reporter: chris snow >Assignee: Seth Wiesman >Priority: Trivial > > It would be useful if the documentation provided information on connecting to > non-AWS S3 endpoints when using presto. For example: > > > You need to configure both {{s3.access-key}} and {{s3.secret-key}} in Flink's > {{flink-conf.yaml}}: > {code:java} > s3.access-key: your-access-key > s3.secret-key: your-secret-key{code} > If you are using a non-AWS S3 endpoint (such as [IBM's Cloud Object > Storage|https://www.ibm.com/cloud/object-storage]), you can configure the S3 > endpoint in Flink's {{flink-conf.yaml}}: > {code:java} > s3.endpoint: your-endpoint-hostname{code} > > > Source: > [https://github.com/apache/flink/blob/master/docs/ops/deployment/aws.md] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] sjwiesman commented on issue #8326: [FLINK-12378][docs] Consolidate FileSystem Documentation
sjwiesman commented on issue #8326: [FLINK-12378][docs] Consolidate FileSystem Documentation URL: https://github.com/apache/flink/pull/8326#issuecomment-488131970 @flinkbot attention @fhueske This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #8326: [FLINK-12378][docs] Consolidate FileSystem Documentation
flinkbot commented on issue #8326: [FLINK-12378][docs] Consolidate FileSystem Documentation URL: https://github.com/apache/flink/pull/8326#issuecomment-488131875 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sjwiesman opened a new pull request #8326: Flink 12378
sjwiesman opened a new pull request #8326: Flink 12378 URL: https://github.com/apache/flink/pull/8326 ## What is the purpose of the change Currently flink's filesystem documentation is spread across a number of pages without any clear connection. A non-exhaustive list of issues includes: S3 documentation spread across many pages OSS filesystem is listed under deployments when it is an object store deployments/filesystem.md has a lot of unrelated information We should create a filesystem subsection under deployments with multiple pages containing all relevant information about Flink's filesystem abstraction. This PR also resolves FLINK-8513 and FLINK-10249 which were minor additions to the S3 documentation. ## Verifying this change N/A ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (**yes** / no / don't know) This does not touch S3 file system code but does touch the documentation. ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (not applicable / **docs** / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] piyushnarang commented on issue #8117: [FLINK-12115] [filesystems]: Add support for AzureFS
piyushnarang commented on issue #8117: [FLINK-12115] [filesystems]: Add support for AzureFS URL: https://github.com/apache/flink/pull/8117#issuecomment-488128359 P.S. @tillrohrmann / @shuai-xu - I was able to test this out on a hadoop cluster. I had to copy the following jars to my lib dir to get it to work - `azure-storage-2.0.0.jar`, `flink-azure-fs-hadoop-1.6-SNAPSHOT.jar`, `hadoop-azure-*.jar` I'll dig into how the other filesystems are achieving this in their uber jar so that we can also generate one flink-azure-fs-hadoop jar which includes the azure-storage and hadoop-azure deps. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-12378) Consolidate FileSystem Documentation
Seth Wiesman created FLINK-12378: Summary: Consolidate FileSystem Documentation Key: FLINK-12378 URL: https://issues.apache.org/jira/browse/FLINK-12378 Project: Flink Issue Type: Improvement Components: Documentation, FileSystems Reporter: Seth Wiesman Assignee: Seth Wiesman Currently flink's filesystem documentation is spread across a number of pages without any clear connection. A non-exhaustive list of issues includes: * S3 documentation spread across many pages * OSS filesystem is listed under deployments when it is an object store * deployments/filesystem.md has a lot of unrelated information We should create a filesystem subsection under deployments with multiple pages containing all relevant information about Flink's filesystem abstraction. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-8513) Add documentation for connecting to non-AWS S3 endpoints
[ https://issues.apache.org/jira/browse/FLINK-8513?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Seth Wiesman reassigned FLINK-8513: --- Assignee: Seth Wiesman > Add documentation for connecting to non-AWS S3 endpoints > > > Key: FLINK-8513 > URL: https://issues.apache.org/jira/browse/FLINK-8513 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem, Documentation >Reporter: chris snow >Assignee: Seth Wiesman >Priority: Trivial > > It would be useful if the documentation provided information on connecting to > non-AWS S3 endpoints when using presto. For example: > > > You need to configure both {{s3.access-key}} and {{s3.secret-key}} in Flink's > {{flink-conf.yaml}}: > {code:java} > s3.access-key: your-access-key > s3.secret-key: your-secret-key{code} > If you are using a non-AWS S3 endpoint (such as [IBM's Cloud Object > Storage|https://www.ibm.com/cloud/object-storage]), you can configure the S3 > endpoint in Flink's {{flink-conf.yaml}}: > {code:java} > s3.endpoint: your-endpoint-hostname{code} > > > Source: > [https://github.com/apache/flink/blob/master/docs/ops/deployment/aws.md] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12377) Outdated docs (Flink 0.10) for Google Compute Engine
Henrik created FLINK-12377: -- Summary: Outdated docs (Flink 0.10) for Google Compute Engine Key: FLINK-12377 URL: https://issues.apache.org/jira/browse/FLINK-12377 Project: Flink Issue Type: Bug Components: Documentation Affects Versions: 1.8.0 Reporter: Henrik [https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/deployment/gce_setup.html] links to [https://github.com/GoogleCloudPlatform/bdutil/blob/master/extensions/flink/flink_env.sh] which uses ancient versions of Hadoop and Flink. Also the barrier to a newcomer is that bdutil itself is deprecated and the readme recommends DataFlow instead. Furthermore, perhaps it would be wise to include GCP in the built-in filesystems in [https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/filesystems.html?|https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/filesystems.html] Further, [https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/filesystems.html#hdfs-and-hadoop-file-system-support] doesn't actually link to any of the other configuration pages for thees other hadoop-based filesystems, nor does it explain how what exact library needs to be in the flink `lib folder; so it's really hard to go any further from there. Lastly, it would seem the 1.8.0 release has stopped shipping the Hadoop libs, without documenting the change required, e.g. in the Hadoop File System page linked above. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-10249) Document hadoop/presto s3 file system configuration forwarding
[ https://issues.apache.org/jira/browse/FLINK-10249?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Seth Wiesman reassigned FLINK-10249: Assignee: Seth Wiesman > Document hadoop/presto s3 file system configuration forwarding > -- > > Key: FLINK-10249 > URL: https://issues.apache.org/jira/browse/FLINK-10249 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem, Documentation >Reporter: Andrey Zagrebin >Assignee: Seth Wiesman >Priority: Minor > > Flink hadoop and presto s3 file system factories (S3FileSystemFactory) use > HadoopConfigLoader which automatically converts and prefixes s3.* config > options to configure underlying s3 clients. We can leave at least a hint > about this behaviour for users who want to change config of these underlying > s3 clients, e.g. in docs/ops/deployment/aws.md -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12376) GCS runtime exn: Request payload size exceeds the limit
[ https://issues.apache.org/jira/browse/FLINK-12376?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Henrik updated FLINK-12376: --- Description: I'm trying to use the google cloud storage file system, but it would seem that the FLINK / GCS client libs are creating too-large requests far down in the GCS Java client. The Java client is added to the lib folder with this command in Dockerfile (probably [hadoop2-1.9.16|https://search.maven.org/artifact/com.google.cloud.bigdataoss/gcs-connector/hadoop2-1.9.16/jar] at the time of writing): {code:java} ADD https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-latest-hadoop2.jar /opt/flink/lib{code} This is the crash output. Focus lines: {code:java} java.lang.RuntimeException: Error while confirming checkpoint{code} and {code:java} Caused by: com.google.api.gax.rpc.InvalidArgumentException: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Request payload size exceeds the limit: 524288 bytes.{code} Full stacktrace: {code:java} [analytics-867c867ff6-l622h taskmanager] 2019-04-30 20:23:14,532 INFO org.apache.flink.runtime.taskmanager.Task - Source: Custom Source -> Process -> Timestamps/Watermarks -> app_events (1/1) (9a01e96c0271025d5ba73b735847cd4c) switched from RUNNING to FAILED. [analytics-867c867ff6-l622h taskmanager] java.lang.RuntimeException: Error while confirming checkpoint [analytics-867c867ff6-l622h taskmanager] at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1211) [analytics-867c867ff6-l622h taskmanager] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [analytics-867c867ff6-l622h taskmanager] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [analytics-867c867ff6-l622h taskmanager] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [analytics-867c867ff6-l622h taskmanager] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [analytics-867c867ff6-l622h taskmanager] at java.lang.Thread.run(Thread.java:748) [analytics-867c867ff6-l622h taskmanager] Caused by: com.google.api.gax.rpc.InvalidArgumentException: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Request payload size exceeds the limit: 524288 bytes. [analytics-867c867ff6-l622h taskmanager] at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:49) [analytics-867c867ff6-l622h taskmanager] at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:72) [analytics-867c867ff6-l622h taskmanager] at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:60) [analytics-867c867ff6-l622h taskmanager] at com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97) [analytics-867c867ff6-l622h taskmanager] at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:68) [analytics-867c867ff6-l622h taskmanager] at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1056) [analytics-867c867ff6-l622h taskmanager] at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:30) [analytics-867c867ff6-l622h taskmanager] at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1138) [analytics-867c867ff6-l622h taskmanager] at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:958) [analytics-867c867ff6-l622h taskmanager] at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:748) [analytics-867c867ff6-l622h taskmanager] at io.grpc.stub.ClientCalls$GrpcFuture.setException(ClientCalls.java:507) [analytics-867c867ff6-l622h taskmanager] at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:482) [analytics-867c867ff6-l622h taskmanager] at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39) [analytics-867c867ff6-l622h taskmanager] at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23) [analytics-867c867ff6-l622h taskmanager] at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40) [analytics-867c867ff6-l622h taskmanager] at io.grpc.internal.CensusStatsModule$StatsClientInterceptor$1$1.onClose(CensusStatsModule.java:694) [analytics-867c867ff6-l622h taskmanager] at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39) [analytics-867c867ff6-l622h taskmanager] at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23) [analytics-867c867ff6-l622h taskmanager] at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40) [analytics-867c8
[jira] [Commented] (FLINK-12355) KafkaITCase.testTimestamps is unstable
[ https://issues.apache.org/jira/browse/FLINK-12355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16830686#comment-16830686 ] Bowen Li commented on FLINK-12355: -- saw another failure https://api.travis-ci.org/v3/job/526561585/log.txt > KafkaITCase.testTimestamps is unstable > -- > > Key: FLINK-12355 > URL: https://issues.apache.org/jira/browse/FLINK-12355 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka, Test Infrastructure >Affects Versions: 1.9.0 >Reporter: Yu Li >Priority: Major > Labels: test-stability > Fix For: 1.9.0 > > > The {{KafkaITCase.testTimestamps}} failed on Travis because it timed out. > https://api.travis-ci.org/v3/job/525503117/log.txt -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12355) KafkaITCase.testTimestamps is unstable
[ https://issues.apache.org/jira/browse/FLINK-12355?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li updated FLINK-12355: - Affects Version/s: 1.9.0 > KafkaITCase.testTimestamps is unstable > -- > > Key: FLINK-12355 > URL: https://issues.apache.org/jira/browse/FLINK-12355 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka, Test Infrastructure >Affects Versions: 1.9.0 >Reporter: Yu Li >Priority: Major > Labels: test-stability > > The {{KafkaITCase.testTimestamps}} failed on Travis because it timed out. > https://api.travis-ci.org/v3/job/525503117/log.txt -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12355) KafkaITCase.testTimestamps is unstable
[ https://issues.apache.org/jira/browse/FLINK-12355?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li updated FLINK-12355: - Fix Version/s: 1.9.0 > KafkaITCase.testTimestamps is unstable > -- > > Key: FLINK-12355 > URL: https://issues.apache.org/jira/browse/FLINK-12355 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka, Test Infrastructure >Affects Versions: 1.9.0 >Reporter: Yu Li >Priority: Major > Labels: test-stability > Fix For: 1.9.0 > > > The {{KafkaITCase.testTimestamps}} failed on Travis because it timed out. > https://api.travis-ci.org/v3/job/525503117/log.txt -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] piyushnarang commented on issue #8117: [FLINK-12115] [filesystems]: Add support for AzureFS
piyushnarang commented on issue #8117: [FLINK-12115] [filesystems]: Add support for AzureFS URL: https://github.com/apache/flink/pull/8117#issuecomment-488112927 @tillrohrmann do you know if there's any special setup required for the flink e2e tests to run on developer machines? I'm trying to run a azure_fs e2e test I've created and am consistently seeing the test failing due to a timeout as the dispatcher rest endpoint hasn't come up in 20s. I also see the same error when I try to run the test_batch_wordcount.sh: ``` $ export FLINK_DIR=flink-dist/target/flink-1.9-SNAPSHOT-bin/flink-1.9-SNAPSHOT $ export HADOOP_CLASSPATH=... $ export HADOOP_CONF_DIR=... $ flink-end-to-end-tests/run-single-test.sh flink-end-to-end-tests/test-scripts/test_batch_wordcount.sh Starting cluster. Starting standalonesession daemon on host C02T3863HF1R. Starting taskexecutor daemon on host C02T3863HF1R. Waiting for dispatcher REST endpoint to come up... ... Dispatcher REST endpoint has not started within a timeout of 20 sec [FAIL] Test script contains errors. Checking for errors... No errors in log files. Checking for exceptions... No exceptions in log files. Checking for non-empty .out files... Found non-empty .out files: SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/Users/p.narang/workspace/flink/flink-dist/target/flink-1.9-SNAPSHOT-bin/flink-1.9-SNAPSHOT/lib/slf4j-log4j12-1.7.15.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/Users/p.narang/Downloads/hadoop-2.7.7/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/Users/p.narang/workspace/flink/flink-dist/target/flink-1.9-SNAPSHOT-bin/flink-1.9-SNAPSHOT/lib/slf4j-log4j12-1.7.15.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/Users/p.narang/Downloads/hadoop-2.7.7/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] [FAIL] 'flink-end-to-end-tests/test-scripts/test_batch_wordcount.sh' failed after 0 minutes and 26 seconds! Test exited with exit code 1 and the logs contained errors, exceptions or non-empty .out files ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanyan300300 commented on issue #8215: [FLINK-8801][yarn/s3] Fix jars downloading issues due to inconsistent timestamp in S3 Filesystem
yanyan300300 commented on issue #8215: [FLINK-8801][yarn/s3] Fix jars downloading issues due to inconsistent timestamp in S3 Filesystem URL: https://github.com/apache/flink/pull/8215#issuecomment-488111519 @tillrohrmann @aljoscha Could you kindly review this change? Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-12376) GCS runtime exn: Request payload size exceeds the limit
Henrik created FLINK-12376: -- Summary: GCS runtime exn: Request payload size exceeds the limit Key: FLINK-12376 URL: https://issues.apache.org/jira/browse/FLINK-12376 Project: Flink Issue Type: Bug Components: FileSystems Affects Versions: 1.7.2 Environment: * k8s latest docker-for-desktop on macOS, and scala 2.11-compiled Flink Reporter: Henrik Attachments: Screenshot 2019-04-30 at 22.32.34.png I'm trying to use the google cloud storage file system, but it would seem that the FLINK / GCS client libs are creating too-large requests far down in the GCS Java client. {code:java} [analytics-867c867ff6-l622h taskmanager] 2019-04-30 20:23:14,532 INFO org.apache.flink.runtime.taskmanager.Task - Source: Custom Source -> Process -> Timestamps/Watermarks -> app_events (1/1) (9a01e96c0271025d5ba73b735847cd4c) switched from RUNNING to FAILED. [analytics-867c867ff6-l622h taskmanager] java.lang.RuntimeException: Error while confirming checkpoint [analytics-867c867ff6-l622h taskmanager] at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1211) [analytics-867c867ff6-l622h taskmanager] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [analytics-867c867ff6-l622h taskmanager] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [analytics-867c867ff6-l622h taskmanager] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [analytics-867c867ff6-l622h taskmanager] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [analytics-867c867ff6-l622h taskmanager] at java.lang.Thread.run(Thread.java:748) [analytics-867c867ff6-l622h taskmanager] Caused by: com.google.api.gax.rpc.InvalidArgumentException: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Request payload size exceeds the limit: 524288 bytes. [analytics-867c867ff6-l622h taskmanager] at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:49) [analytics-867c867ff6-l622h taskmanager] at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:72) [analytics-867c867ff6-l622h taskmanager] at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:60) [analytics-867c867ff6-l622h taskmanager] at com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97) [analytics-867c867ff6-l622h taskmanager] at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:68) [analytics-867c867ff6-l622h taskmanager] at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1056) [analytics-867c867ff6-l622h taskmanager] at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:30) [analytics-867c867ff6-l622h taskmanager] at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1138) [analytics-867c867ff6-l622h taskmanager] at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:958) [analytics-867c867ff6-l622h taskmanager] at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:748) [analytics-867c867ff6-l622h taskmanager] at io.grpc.stub.ClientCalls$GrpcFuture.setException(ClientCalls.java:507) [analytics-867c867ff6-l622h taskmanager] at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:482) [analytics-867c867ff6-l622h taskmanager] at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39) [analytics-867c867ff6-l622h taskmanager] at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23) [analytics-867c867ff6-l622h taskmanager] at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40) [analytics-867c867ff6-l622h taskmanager] at io.grpc.internal.CensusStatsModule$StatsClientInterceptor$1$1.onClose(CensusStatsModule.java:694) [analytics-867c867ff6-l622h taskmanager] at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39) [analytics-867c867ff6-l622h taskmanager] at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23) [analytics-867c867ff6-l622h taskmanager] at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40) [analytics-867c867ff6-l622h taskmanager] at io.grpc.internal.CensusTracingModule$TracingClientInterceptor$1$1.onClose(CensusTracingModule.java:397) [analytics-867c867ff6-l622h taskmanager] at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:459) [analytics-867c867ff6-l622h taskmanager] at io.grpc.internal.ClientCa
[GitHub] [flink] piyushnarang commented on a change in pull request #8117: [FLINK-12115] [filesystems]: Add support for AzureFS
piyushnarang commented on a change in pull request #8117: [FLINK-12115] [filesystems]: Add support for AzureFS URL: https://github.com/apache/flink/pull/8117#discussion_r279896298 ## File path: flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AzureFileSystem.java ## @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.azurefs; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystemKind; +import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; +import org.apache.flink.runtime.util.HadoopUtils; + +import org.apache.hadoop.fs.azure.NativeAzureFileSystem; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; + +/** + * Azure FileSystem connector for Flink. Based on Azure HDFS support in the + * https://hadoop.apache.org/docs/current/hadoop-azure/index.html";>hadoop-azure module. + */ +public class AzureFileSystem extends HadoopFileSystem { + private static final Logger LOG = LoggerFactory.getLogger(AzureFileSystem.class); + + private static final String[] CONFIG_PREFIXES = { "fs.azure.", "azure." }; + + public AzureFileSystem(URI fsUri, Configuration flinkConfig) throws IOException { + super(createInitializedAzureFS(fsUri, flinkConfig)); + } + + // uri is of the form: wasb(s)://yourcontai...@youraccount.blob.core.windows.net/testDir + private static org.apache.hadoop.fs.FileSystem createInitializedAzureFS(URI fsUri, Configuration flinkConfig) throws IOException { + org.apache.hadoop.conf.Configuration hadoopConfig = HadoopUtils.getHadoopConfiguration(flinkConfig); + + copyFlinkToHadoopConfig(flinkConfig, hadoopConfig); Review comment: hmm so I'm wondering if we could push this to a follow up change. The hadoop config loader is in the flink-s3-fs-base module. So we'll have to spin up a new module and refactor things a bit in the s3 code as well. Also if we want to do it correctly, we probably want to use the config loader for the other non-S3 filesystems (like OSS) which are also currently doing things in a similar fashion. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-12375) flink-container job jar does not have read permissions
Adam Lamar created FLINK-12375: -- Summary: flink-container job jar does not have read permissions Key: FLINK-12375 URL: https://issues.apache.org/jira/browse/FLINK-12375 Project: Flink Issue Type: Bug Reporter: Adam Lamar When building a custom job jar using flink-container, the job can't be launched if the provided job jar does not have world-readable permission. This is because the job jar in the container is owned by root:root, but the docker container executes as the flink user. In environments with restrictive umasks (e.g. company laptops) that create files without group and other read permissions by default, this causes the instructions to fail. To reproduce on master: {code:java} cd flink-container/docker cp ../../flink-examples/flink-examples-streaming/target/WordCount.jar . chmod go-r WordCount.jar # still maintain user read permission ./build.sh --job-jar WordCount.jar --from-archive flink-1.8.0-bin-scala_2.11.tgz --image-name flink-job:latest FLINK_DOCKER_IMAGE_NAME=flink-job FLINK_JOB=org.apache.flink.streaming.examples.wordcount.WordCount docker-compose up{code} which results in the following error: {code:java} job-cluster_1 | 2019-04-30 18:40:57,787 ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Could not start cluster entrypoint StandaloneJobClusterEntryPoint. job-cluster_1 | org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to initialize the cluster entrypoint StandaloneJobClusterEntryPoint. job-cluster_1 | at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:190) job-cluster_1 | at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:535) job-cluster_1 | at org.apache.flink.container.entrypoint.StandaloneJobClusterEntryPoint.main(StandaloneJobClusterEntryPoint.java:105) job-cluster_1 | Caused by: org.apache.flink.util.FlinkException: Could not create the DispatcherResourceManagerComponent. job-cluster_1 | at org.apache.flink.runtime.entrypoint.component.AbstractDispatcherResourceManagerComponentFactory.create(AbstractDispatcherResourceManagerComponentFactory.java:257) job-cluster_1 | at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:224) job-cluster_1 | at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:172) job-cluster_1 | at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) job-cluster_1 | at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:171) job-cluster_1 | ... 2 more job-cluster_1 | Caused by: org.apache.flink.util.FlinkException: Could not load the provided entrypoint class. job-cluster_1 | at org.apache.flink.container.entrypoint.ClassPathJobGraphRetriever.createPackagedProgram(ClassPathJobGraphRetriever.java:119) job-cluster_1 | at org.apache.flink.container.entrypoint.ClassPathJobGraphRetriever.retrieveJobGraph(ClassPathJobGraphRetriever.java:96) job-cluster_1 | at org.apache.flink.runtime.dispatcher.JobDispatcherFactory.createDispatcher(JobDispatcherFactory.java:62) job-cluster_1 | at org.apache.flink.runtime.dispatcher.JobDispatcherFactory.createDispatcher(JobDispatcherFactory.java:41) job-cluster_1 | at org.apache.flink.runtime.entrypoint.component.AbstractDispatcherResourceManagerComponentFactory.create(AbstractDispatcherResourceManagerComponentFactory.java:184) job-cluster_1 | ... 6 more job-cluster_1 | Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.examples.wordcount.WordCount job-cluster_1 | at java.net.URLClassLoader.findClass(URLClassLoader.java:382) job-cluster_1 | at java.lang.ClassLoader.loadClass(ClassLoader.java:424) job-cluster_1 | at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) job-cluster_1 | at java.lang.ClassLoader.loadClass(ClassLoader.java:357) job-cluster_1 | at org.apache.flink.container.entrypoint.ClassPathJobGraphRetriever.createPackagedProgram(ClassPathJobGraphRetriever.java:116) job-cluster_1 | ... 10 more{code} This issue can be fixed by chown'ing the job.jar file to flink:flink in the Dockerfile. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] yanyan300300 commented on issue #8215: [FLINK-8801][yarn/s3] Fix jars downloading issues due to inconsistent timestamp in S3 Filesystem
yanyan300300 commented on issue #8215: [FLINK-8801][yarn/s3] Fix jars downloading issues due to inconsistent timestamp in S3 Filesystem URL: https://github.com/apache/flink/pull/8215#issuecomment-488062731 > Indeed, it looks like this is the better approach. Aside from some minor details in the code, I would only have one concern which I'm not sure about: > > * Are we allowed to change the local resource's time? What are the implications? Yes, Flink will need to set the local resource's time explicitly. This is also the case before your change. > > I guess, this is the local resource file that was uploaded to the JM and there are no further implications but I'm not 100% sure. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanyan300300 edited a comment on issue #8215: [FLINK-8801][yarn/s3] Fix jars downloading issues due to inconsistent timestamp in S3 Filesystem
yanyan300300 edited a comment on issue #8215: [FLINK-8801][yarn/s3] Fix jars downloading issues due to inconsistent timestamp in S3 Filesystem URL: https://github.com/apache/flink/pull/8215#issuecomment-488062731 > Indeed, it looks like this is the better approach. Aside from some minor details in the code, I would only have one concern which I'm not sure about: > > * Are we allowed to change the local resource's time? What are the implications? > > I guess, this is the local resource file that was uploaded to the JM and there are no further implications but I'm not 100% sure. Yes, Flink will need to set the local resource's time explicitly. This is also the case before your change. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanyan300300 commented on a change in pull request #8215: [FLINK-8801][yarn/s3] Fix jars downloading issues due to inconsistent timestamp in S3 Filesystem
yanyan300300 commented on a change in pull request #8215: [FLINK-8801][yarn/s3] Fix jars downloading issues due to inconsistent timestamp in S3 Filesystem URL: https://github.com/apache/flink/pull/8215#discussion_r279876022 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java ## @@ -160,8 +170,37 @@ public static void setupYarnClassPath(Configuration conf, Map ap fs.copyFromLocalFile(false, true, localSrcPath, dst); + // Note: If we directly used registerLocalResource(FileSystem, Path) here, we would access the remote + // file once again which has problems with eventually consistent read-after-write file + // systems. Instead, we decide to wait until the remote file be available. + + FileStatus[] fss = null; + int iter = 1; + while (iter <= REMOTE_RESOURCES_FETCH_NUM_RETRY) { Review comment: And could you clarify by "did in YarnFileStageTest#testCopyFromLocalRecursive()"? Are you suggesting testing the retry on `FileNotFoundException`? I think currently Flink is using a real S3 filesystem instead of a mock, so I am not quite sure how to do it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanyan300300 commented on a change in pull request #8215: [FLINK-8801][yarn/s3] Fix jars downloading issues due to inconsistent timestamp in S3 Filesystem
yanyan300300 commented on a change in pull request #8215: [FLINK-8801][yarn/s3] Fix jars downloading issues due to inconsistent timestamp in S3 Filesystem URL: https://github.com/apache/flink/pull/8215#discussion_r279873137 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java ## @@ -160,8 +170,37 @@ public static void setupYarnClassPath(Configuration conf, Map ap fs.copyFromLocalFile(false, true, localSrcPath, dst); + // Note: If we directly used registerLocalResource(FileSystem, Path) here, we would access the remote + // file once again which has problems with eventually consistent read-after-write file + // systems. Instead, we decide to wait until the remote file be available. + + FileStatus[] fss = null; + int iter = 1; + while (iter <= REMOTE_RESOURCES_FETCH_NUM_RETRY) { Review comment: Thanks. Changed to `REMOTE_RESOURCES_FETCH_NUM_RETRY + 1` so that it will do an initial attempt at retry = 0. The retry definition is in line 86, which should only happen when the `FileNotFoundException` happens. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanyan300300 commented on a change in pull request #8215: [FLINK-8801][yarn/s3] Fix jars downloading issues due to inconsistent timestamp in S3 Filesystem
yanyan300300 commented on a change in pull request #8215: [FLINK-8801][yarn/s3] Fix jars downloading issues due to inconsistent timestamp in S3 Filesystem URL: https://github.com/apache/flink/pull/8215#discussion_r279865426 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java ## @@ -160,8 +170,37 @@ public static void setupYarnClassPath(Configuration conf, Map ap fs.copyFromLocalFile(false, true, localSrcPath, dst); + // Note: If we directly used registerLocalResource(FileSystem, Path) here, we would access the remote + // file once again which has problems with eventually consistent read-after-write file + // systems. Instead, we decide to wait until the remote file be available. + + FileStatus[] fss = null; + int iter = 1; + while (iter <= REMOTE_RESOURCES_FETCH_NUM_RETRY) { + try { + fss = fs.listStatus(dst); + break; + } catch (FileNotFoundException e) { + LOG.debug("Got FileNotFoundException while fetching uploaded remote resources at retry num {}", iter); + try { + LOG.debug("Sleeping for {}ms", REMOTE_RESOURCES_FETCH_WAIT_IN_MILLI); + TimeUnit.MILLISECONDS.sleep(REMOTE_RESOURCES_FETCH_WAIT_IN_MILLI); + } catch (InterruptedException ie) { + LOG.warn("Failed to sleep for {}ms at retry num {} while fetching uploaded remote resources", + REMOTE_RESOURCES_FETCH_WAIT_IN_MILLI, iter, ie); + } + iter++; + } + } + long dstModificationTime = -1; + if (fss != null && fss.length > 0) { + dstModificationTime = fss[0].getModificationTime(); + } + LOG.debug("Got modification time {} from remote path {} at time {}", dstModificationTime, dst, Instant.now().toEpochMilli()); Review comment: Yes, you are right. Removed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanyan300300 commented on a change in pull request #8215: [FLINK-8801][yarn/s3] Fix jars downloading issues due to inconsistent timestamp in S3 Filesystem
yanyan300300 commented on a change in pull request #8215: [FLINK-8801][yarn/s3] Fix jars downloading issues due to inconsistent timestamp in S3 Filesystem URL: https://github.com/apache/flink/pull/8215#discussion_r279864771 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java ## @@ -187,6 +226,27 @@ public static void deleteApplicationFiles(final Map env) { LOG.debug("No yarn application files directory set. Therefore, cannot clean up the data."); } } + /** +* Creates a YARN resource for the remote object at the given location. +* +* @param remoteRsrcPathremote location of the resource +* @param resourceSize size of the resource +* @param resourceModificationTime last modification time of the resource +* +* @return YARN resource +*/ + private static LocalResource registerLocalResource( + Path remoteRsrcPath, + long resourceSize, + long resourceModificationTime) { + LocalResource localResource = Records.newRecord(LocalResource.class); + localResource.setResource(ConverterUtils.getYarnUrlFromURI(remoteRsrcPath.toUri())); + localResource.setSize(resourceSize); + localResource.setTimestamp(resourceModificationTime); + localResource.setType(LocalResourceType.FILE); + localResource.setVisibility(LocalResourceVisibility.APPLICATION); + return localResource; + } Review comment: The revert is suggested by Till in the Jira. Could you be specific on indentation on which line? It seems following the convention. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #8325: [hotfix] [table] Refactor GenericInMemoryCatalogTest to prepare for moving common tests to CatalogTestBase
flinkbot commented on issue #8325: [hotfix] [table] Refactor GenericInMemoryCatalogTest to prepare for moving common tests to CatalogTestBase URL: https://github.com/apache/flink/pull/8325#issuecomment-488043959 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 opened a new pull request #8325: [hotfix] [table] Refactor GenericInMemoryCatalogTest to prepare for moving common tests to CatalogTestBase
bowenli86 opened a new pull request #8325: [hotfix] [table] Refactor GenericInMemoryCatalogTest to prepare for moving common tests to CatalogTestBase URL: https://github.com/apache/flink/pull/8325 ## What is the purpose of the change This PR refactors existing GenericInMemoryCatalogTest as preparation for moving common tests to CatalogTestBase. Since the unit test logic for different catalog's API implementations are the mostly the same, CatalogTestBase should hold all those common tests so we can reuse them for different catalog test class rather than writing duplicated tests. ## Brief change log - Split unit tests for partitioned table and non-partitioned table - Moved CatalogTestUtil from flink-table-api-java to flink-table-common - Removed some redundant utils from CatalogTestUtil ## Verifying this change This change is already covered by existing tests, such as *GenericInMemoryCatalogTest*. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-11475) Adapt existing InMemoryExternalCatalog to GenericInMemoryCatalog
[ https://issues.apache.org/jira/browse/FLINK-11475?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li closed FLINK-11475. Resolution: Won't Fix created GenericInMemoryCatalog from scratch > Adapt existing InMemoryExternalCatalog to GenericInMemoryCatalog > > > Key: FLINK-11475 > URL: https://issues.apache.org/jira/browse/FLINK-11475 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Xuefu Zhang >Assignee: Bowen Li >Priority: Major > > {{GenericInMemoryCatalog}} needs to implement ReadableWritableCatalog > interface based on the design. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11921) Upgrade Calcite dependency to 1.19
[ https://issues.apache.org/jira/browse/FLINK-11921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16830474#comment-16830474 ] Rong Rong commented on FLINK-11921: --- Thanks for the follow up [~ykt836]. I found the issue and it is actually due to a follow up commit from [CALCITE PR #1002|https://github.com/apache/calcite/pull/1002]. This removes the corresponding rel datatype from the digestToRelMap. Thus one solution is altering the way we compute the digest for a predicator-pushed-down table source scan. I created the PR for the purpose. Please kindly take a look > Upgrade Calcite dependency to 1.19 > -- > > Key: FLINK-11921 > URL: https://issues.apache.org/jira/browse/FLINK-11921 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Reporter: Timo Walther >Assignee: Rong Rong >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Umbrella issue for all tasks related to the next Calcite upgrade. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] flinkbot commented on issue #8324: [FLINK-11921][table] Upgrade to calcite 1.19
flinkbot commented on issue #8324: [FLINK-11921][table] Upgrade to calcite 1.19 URL: https://github.com/apache/flink/pull/8324#issuecomment-488032392 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-11921) Upgrade Calcite dependency to 1.19
[ https://issues.apache.org/jira/browse/FLINK-11921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-11921: --- Labels: pull-request-available (was: ) > Upgrade Calcite dependency to 1.19 > -- > > Key: FLINK-11921 > URL: https://issues.apache.org/jira/browse/FLINK-11921 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Reporter: Timo Walther >Assignee: Rong Rong >Priority: Major > Labels: pull-request-available > > Umbrella issue for all tasks related to the next Calcite upgrade. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] walterddr opened a new pull request #8324: [FLINK-11921][table] Upgrade to calcite 1.19
walterddr opened a new pull request #8324: [FLINK-11921][table] Upgrade to calcite 1.19 URL: https://github.com/apache/flink/pull/8324 ## What is the purpose of the change This change upgrades apache calcite dependencies to 1.19.0 release. ## Brief change log - Updated pom.xml file to include calcite 1.19.0 - Updated all test cases that includes literals which now includes type in digest. - Updated the 2 pull-in files `DateTimeUtils` and `AuxiliaryConverter` and updated the depending calcite JIRAs which will eventually allow us to remove these two pull-ins. - Updated the `FlinkLogicalTableSourceScan` to include a new computeDigest override, this allows predicator push-down rules like `PushFilterIntoTableSourceScanRule` and `PushProjectIntoTableSourceScanRule` to find correct entry in digestToRelMap (see CALCITE-2454, and CALCITE PR #1002) ## Verifying this change - This change is already covered by existing tests - Verified via `mvn dependeny:tree` that no extra dependencies were pulled in. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): yes - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? n/a This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8314: [FLINK-12365][table] Add stats related catalog APIs
bowenli86 commented on a change in pull request #8314: [FLINK-12365][table] Add stats related catalog APIs URL: https://github.com/apache/flink/pull/8314#discussion_r279839941 ## File path: flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java ## @@ -1104,6 +1104,43 @@ public void testDropFunction_FunctionNotExist_ignored() throws Exception { catalog.dropDatabase(db1, false); } + // -- statistics -- Review comment: > Well, I think the added test is written differently as others, which gives such an expression. However, it covers all the added implementation methods, as you can see. What's missing, though, is some negative test cases, which I will create a followup JIRA for this. I think we need to have a consistent style for unit tests in this class, otherwise it's hard for other developers to figure out the two styles and extend/modify the testing code This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on issue #7822: [FLINK-11726][network] Refactor the creation of ResultPartition and InputGate into NetworkEnvironment
zhijiangW commented on issue #7822: [FLINK-11726][network] Refactor the creation of ResultPartition and InputGate into NetworkEnvironment URL: https://github.com/apache/flink/pull/7822#issuecomment-488025285 I updated the codes for addressing the left comments. @azagrebin Regarding the `NetworkEnvironment#start` for other tests, I left some concerns. Because of rebasing master to solve the conflicts easily, I squashed all the fixup commits together. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #7822: [FLINK-11726][network] Refactor the creation of ResultPartition and InputGate into NetworkEnvironment
zhijiangW commented on a change in pull request #7822: [FLINK-11726][network] Refactor the creation of ResultPartition and InputGate into NetworkEnvironment URL: https://github.com/apache/flink/pull/7822#discussion_r279837029 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java ## @@ -220,6 +222,8 @@ public void setup() throws IOException { jobManagerLeaderRetriever = new SettableLeaderRetrievalService(); haServices.setResourceManagerLeaderRetriever(resourceManagerLeaderRetriever); haServices.setJobMasterLeaderRetriever(jobId, jobManagerLeaderRetriever); + + networkEnvironment = new NetworkEnvironmentBuilder().build(); Review comment: I only call the start for `testShouldShutDownTaskManagerServicesInPostStop` to keep the same behavior as before. For the other tests in `TaskTest` and `TaskAsyncCallTest`, I think it is better to not call start atm for the following concerns: 1. Start would bind port which takes some time for each test 2. The bind port would consume more system resources 3. It is not actually useful to start `NettyServer`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11637) Translate "Checkpoints" page into Chinese
[ https://issues.apache.org/jira/browse/FLINK-11637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16830423#comment-16830423 ] dalongliu commented on FLINK-11637: --- Hi, [~klion26] Is anyone doing the work now? > Translate "Checkpoints" page into Chinese > - > > Key: FLINK-11637 > URL: https://issues.apache.org/jira/browse/FLINK-11637 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Documentation >Reporter: Congxian Qiu(klion26) >Priority: Major > > doc locates in flink/docs/ops/state/checkpoints.md -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] zhijiangW commented on a change in pull request #7822: [FLINK-11726][network] Refactor the creation of ResultPartition and InputGate into NetworkEnvironment
zhijiangW commented on a change in pull request #7822: [FLINK-11726][network] Refactor the creation of ResultPartition and InputGate into NetworkEnvironment URL: https://github.com/apache/flink/pull/7822#discussion_r279816732 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java ## @@ -220,6 +222,8 @@ public void setup() throws IOException { jobManagerLeaderRetriever = new SettableLeaderRetrievalService(); haServices.setResourceManagerLeaderRetriever(resourceManagerLeaderRetriever); haServices.setJobMasterLeaderRetriever(jobId, jobManagerLeaderRetriever); + + networkEnvironment = new NetworkEnvironmentBuilder().build(); Review comment: Actually it is not needed to call start in these tests, because the start would only make `NettyServer` to bind one port to listen and it would not change any internal state. The `NettyServer` is not really needed in these tests. Certainly it would no harm if calling start always. Do you think we should do that in these tests? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #7822: [FLINK-11726][network] Refactor the creation of ResultPartition and InputGate into NetworkEnvironment
zhijiangW commented on a change in pull request #7822: [FLINK-11726][network] Refactor the creation of ResultPartition and InputGate into NetworkEnvironment URL: https://github.com/apache/flink/pull/7822#discussion_r279814078 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java ## @@ -251,14 +262,9 @@ public void testExecutionFailsInBlobsMissing() throws Exception { @Test public void testExecutionFailsInNetworkRegistration() throws Exception { - // mock a network manager that rejects registration - final ResultPartitionManager partitionManager = mock(ResultPartitionManager.class); final ResultPartitionConsumableNotifier consumableNotifier = new NoOpResultPartitionConsumableNotifier(); final PartitionProducerStateChecker partitionProducerStateChecker = mock(PartitionProducerStateChecker.class); - - final NetworkEnvironment network = mock(NetworkEnvironment.class); - when(network.getResultPartitionManager()).thenReturn(partitionManager); - doThrow(new RuntimeException("buffers")).when(network).registerTask(any(Task.class)); + final NetworkEnvironment network = new NetworkEnvironmentBuilder().build(); Review comment: Yes, you are right when I double check the codes. I thought this `NetworkEnvironment` is created only once for all the tests. It should be no problem to create new one for each test. I would fix it now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #7822: [FLINK-11726][network] Refactor the creation of ResultPartition and InputGate into NetworkEnvironment
zhijiangW commented on a change in pull request #7822: [FLINK-11726][network] Refactor the creation of ResultPartition and InputGate into NetworkEnvironment URL: https://github.com/apache/flink/pull/7822#discussion_r279814078 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java ## @@ -251,14 +262,9 @@ public void testExecutionFailsInBlobsMissing() throws Exception { @Test public void testExecutionFailsInNetworkRegistration() throws Exception { - // mock a network manager that rejects registration - final ResultPartitionManager partitionManager = mock(ResultPartitionManager.class); final ResultPartitionConsumableNotifier consumableNotifier = new NoOpResultPartitionConsumableNotifier(); final PartitionProducerStateChecker partitionProducerStateChecker = mock(PartitionProducerStateChecker.class); - - final NetworkEnvironment network = mock(NetworkEnvironment.class); - when(network.getResultPartitionManager()).thenReturn(partitionManager); - doThrow(new RuntimeException("buffers")).when(network).registerTask(any(Task.class)); + final NetworkEnvironment network = new NetworkEnvironmentBuilder().build(); Review comment: Yes, you are right when I double checked the codes. I thought this `NetworkEnvironment` is created only once for all the tests. It should be no problem to create new one for each test. I would fix it now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #8323: [BP-1.7][FLINK-12296][StateBackend] Fix local state directory collision with state loss for chained keyed operators
flinkbot commented on issue #8323: [BP-1.7][FLINK-12296][StateBackend] Fix local state directory collision with state loss for chained keyed operators URL: https://github.com/apache/flink/pull/8323#issuecomment-488000878 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] klion26 opened a new pull request #8323: [BP-1.7][FLINK-12296][StateBackend] Fix local state directory collision with state loss for chained keyed operators
klion26 opened a new pull request #8323: [BP-1.7][FLINK-12296][StateBackend] Fix local state directory collision with state loss for chained keyed operators URL: https://github.com/apache/flink/pull/8323 ## What is the purpose of the change manual cherry-pick #8263 to release-1.7 ## Verifying this change This change added tests and can be verified as follows: `StatefulOperatorChainedTaskTest#testRestore()` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) @StefanRRichter This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] klion26 commented on issue #8313: [FLINK-12184][Coordination]HistoryServerArchieFetcher incompatible with old version
klion26 commented on issue #8313: [FLINK-12184][Coordination]HistoryServerArchieFetcher incompatible with old version URL: https://github.com/apache/flink/pull/8313#issuecomment-487998692 @tillrohrmann thanks for the quick review. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] lamber-ken edited a comment on issue #8254: [FLINK-12219][runtime] Yarn application can't stop when flink job failed in per-job yarn cluste mode
lamber-ken edited a comment on issue #8254: [FLINK-12219][runtime] Yarn application can't stop when flink job failed in per-job yarn cluste mode URL: https://github.com/apache/flink/pull/8254#issuecomment-48798 I opened a issuse before, it describe a scene that containts two steps. first,the flink job failed and the yarn application don't stop. second,the yarn resourcemanager restarted for some reason, it will attempt to restart all applications. the job in the first step also restarted, but no jobs recover because the zk meta was removed by first step. it'll failed. but the finalState status is UNDEFIEND. these two issues are associated,if you have time,help me review it, thanks. FLINK-12302 https://github.com/apache/flink/pull/8265 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] lamber-ken edited a comment on issue #8254: [FLINK-12219][runtime] Yarn application can't stop when flink job failed in per-job yarn cluste mode
lamber-ken edited a comment on issue #8254: [FLINK-12219][runtime] Yarn application can't stop when flink job failed in per-job yarn cluste mode URL: https://github.com/apache/flink/pull/8254#issuecomment-48798 I opened a issuse before, it describe a scene that containts two steps. first,the flink job failed and the yarn application don't stop. second,the yarn resourcemanager restarted for some reason, it will attempt to restart all applications. the job in the first step also restarted, but no jobs recover because the zk meta was removed by first step. it'll failed. but the finalState status is UNDEFIEND. these two issues are associated,if you have time,help me review it, thanks. https://github.com/apache/flink/pull/8265 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #8233: [FLINK-12227] [runtime] introduce SchedulingStrategy interface
flinkbot edited a comment on issue #8233: [FLINK-12227] [runtime] introduce SchedulingStrategy interface URL: https://github.com/apache/flink/pull/8233#issuecomment-485317615 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ✅ 1. The [description] looks good. - Approved by @GJL [committer] * ✅ 2. There is [consensus] that the contribution should go into to Flink. - Approved by @GJL [committer] * ❓ 3. Needs [attention] from. * ✅ 4. The change fits into the overall [architecture]. - Approved by @GJL [committer] * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] GJL commented on issue #8233: [FLINK-12227] [runtime] introduce SchedulingStrategy interface
GJL commented on issue #8233: [FLINK-12227] [runtime] introduce SchedulingStrategy interface URL: https://github.com/apache/flink/pull/8233#issuecomment-487985822 @flinkbot approve-until architecture This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] GJL edited a comment on issue #8233: [FLINK-12227] [runtime] introduce SchedulingStrategy interface
GJL edited a comment on issue #8233: [FLINK-12227] [runtime] introduce SchedulingStrategy interface URL: https://github.com/apache/flink/pull/8233#issuecomment-487985288 Thanks for your contribution to Apache Flink, @shuai-xu! The changes look overall good, and I have left a few minor comments. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] GJL commented on issue #8233: [FLINK-12227] [runtime] introduce SchedulingStrategy interface
GJL commented on issue #8233: [FLINK-12227] [runtime] introduce SchedulingStrategy interface URL: https://github.com/apache/flink/pull/8233#issuecomment-487985288 Thanks for your contribution to Apache Flink @shuai-xu. The changes look overall good, and I have left a few minor comments. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] GJL commented on a change in pull request #8233: [FLINK-12227] [runtime] introduce SchedulingStrategy interface
GJL commented on a change in pull request #8233: [FLINK-12227] [runtime] introduce SchedulingStrategy interface URL: https://github.com/apache/flink/pull/8233#discussion_r279792812 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ExecutionVertexID.java ## @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; + +import java.util.Objects; + +/** + * Id identifying {@link ExecutionVertex}. + */ +public class ExecutionVertexID { + private final JobVertexID jobVertexId; + + private final int subtaskIndex; + + public ExecutionVertexID(JobVertexID jobVertexId, int subtaskIndex) { + this.jobVertexId = jobVertexId; Review comment: We should add null checks This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] GJL commented on a change in pull request #8233: [FLINK-12227] [runtime] introduce SchedulingStrategy interface
GJL commented on a change in pull request #8233: [FLINK-12227] [runtime] introduce SchedulingStrategy interface URL: https://github.com/apache/flink/pull/8233#discussion_r279786441 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingVertex.java ## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; + +import java.util.Collection; + +/** + * Scheduling representation of {@link ExecutionVertex}. + */ +public interface SchedulingVertex { Review comment: I think that's a reasonable proposal. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] GJL commented on a change in pull request #8233: [FLINK-12227] [runtime] introduce SchedulingStrategy interface
GJL commented on a change in pull request #8233: [FLINK-12227] [runtime] introduce SchedulingStrategy interface URL: https://github.com/apache/flink/pull/8233#discussion_r279758565 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexDeploymentOption.java ## @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; + +/** + * Component that stores the task need to be scheduled and the option for deployment. + */ +public class ExecutionVertexDeploymentOption { + + private final ExecutionVertexID executionVertexId; + + private final DeploymentOption deploymentOption; + + public ExecutionVertexDeploymentOption(ExecutionVertexID executionVertexId, DeploymentOption deploymentOption) { Review comment: I am not sure if I understand correctly. Do you want to replace `DeploymentOption` with a boolean? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] GJL commented on a change in pull request #8233: [FLINK-12227] [runtime] introduce SchedulingStrategy interface
GJL commented on a change in pull request #8233: [FLINK-12227] [runtime] introduce SchedulingStrategy interface URL: https://github.com/apache/flink/pull/8233#discussion_r279788463 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerOperations.java ## @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategy; + +import java.util.Collection; + +/** + * Component which is use by {@link SchedulingStrategy} to commit scheduling decisions. Review comment: _[...] which is used by [...]_ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] GJL commented on a change in pull request #8233: [FLINK-12227] [runtime] introduce SchedulingStrategy interface
GJL commented on a change in pull request #8233: [FLINK-12227] [runtime] introduce SchedulingStrategy interface URL: https://github.com/apache/flink/pull/8233#discussion_r279633733 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingResultPartition.java ## @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.runtime.executiongraph.IntermediateResultPartition; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; + +import java.util.Collection; + +/** + * Representation of {@link IntermediateResultPartition}. + */ +public interface SchedulingResultPartition { + + /** +* Gets id of the result partition. +* +* @return id of the result partition +*/ + IntermediateResultPartitionID getId(); + + /** +* Gets id of the intermediate result. +* +* @return id of the intermediate result +*/ + IntermediateDataSetID getResultId(); + + /** +* Gets the {@link ResultPartitionType}. +* +* @return result partition type +*/ + ResultPartitionType getPartitionType(); + + /** +* Gets the {@link ResultPartitionState}. +* +* @return result partition state +*/ + ResultPartitionState getState(); + + /** +* Gets the producer of this result partition. +* +* @return producer vertex of this result partition +*/ + SchedulingVertex getProducer(); + + /** +* Gets the consumers of this result partition. +* +* @return Collection of consumer vertices of this result partition +*/ + Collection getConsumers(); + + /** +* State of the result partition. +*/ + enum ResultPartitionState { + EMPTY, // producer is not yet running + PRODUCING, // producer is running + DONE, // producer has terminated + RELEASED // partition has been released Review comment: These comments should be changed to Javadoc. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] GJL commented on a change in pull request #8233: [FLINK-12227] [runtime] introduce SchedulingStrategy interface
GJL commented on a change in pull request #8233: [FLINK-12227] [runtime] introduce SchedulingStrategy interface URL: https://github.com/apache/flink/pull/8233#discussion_r279792342 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ExecutionVertexID.java ## @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; + +import java.util.Objects; + +/** + * Id identifying {@link ExecutionVertex}. + */ +public class ExecutionVertexID { Review comment: I think the name is ok for now since we do not prefix other IDs with _Scheduling_. This ID could also move into the ExecutionGraph. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] GJL commented on a change in pull request #8233: [FLINK-12227] [runtime] introduce SchedulingStrategy interface
GJL commented on a change in pull request #8233: [FLINK-12227] [runtime] introduce SchedulingStrategy interface URL: https://github.com/apache/flink/pull/8233#discussion_r279787045 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingStrategy.java ## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.IntermediateResultPartition; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; + +import java.util.Set; + +/** + * Component which encapsulates the scheduling logic. + * It can react to execution state changes and partition consumable events. + * Moreover, it is responsible for resolving task failures. + */ +public interface SchedulingStrategy { + + /** +* Called when the scheduling is started (initial scheduling operation). +*/ + void startScheduling(); + + /** +* Called whenever vertices need to be restarted (due to task failure). +* +* @param verticesNeedingRestart The tasks need to be restarted. +*/ + void restartTasks(Set verticesNeedingRestart); + + /** +* Called whenever an {@link Execution} changes its state. +* +* @param executionVertexId The id of the task. +* @param executionState The new state of the execution. +*/ + void onExecutionStateChange(ExecutionVertexID executionVertexId, ExecutionState executionState); + + /** +* Called whenever an {@link IntermediateResultPartition} becomes consumable. +* +* @param executionVertexId The id of the task whoes result partition becomes consumable. Review comment: _The id of the producer._ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] lamber-ken commented on issue #8254: [FLINK-12219][runtime] Yarn application can't stop when flink job failed in per-job yarn cluste mode
lamber-ken commented on issue #8254: [FLINK-12219][runtime] Yarn application can't stop when flink job failed in per-job yarn cluste mode URL: https://github.com/apache/flink/pull/8254#issuecomment-48798 I opened a issuse before, it describe a scene that containts two steps. first,the flink job failed and the yarn application don't stop. second,the yarn resourcemanager restarted for some reason, it will attempt to restart all applications. the job in the first step also restarted, but no jobs recover because the zk meta was removed by first step. it'll failed. but the finalState status is UNDEFIEND. these two issues are associated,if you have time,help me review it, thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-12343) Allow set file.replication in Yarn Configuration
[ https://issues.apache.org/jira/browse/FLINK-12343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16830362#comment-16830362 ] Zhenqiu Huang commented on FLINK-12343: --- [~till.rohrmann] I agree that we shouldn't define a default value, so that the replication can be easily fall back to the HDFS default in client side. As the method createTaskExecutorContext is mainly to register the remote files as local resources. Probably, we can just use the default setting in the yarn cluster? How do you think? > Allow set file.replication in Yarn Configuration > > > Key: FLINK-12343 > URL: https://issues.apache.org/jira/browse/FLINK-12343 > Project: Flink > Issue Type: Improvement > Components: Command Line Client, Deployment / YARN >Affects Versions: 1.6.4, 1.7.2, 1.8.0 >Reporter: Zhenqiu Huang >Assignee: Zhenqiu Huang >Priority: Minor > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Currently, FlinkYarnSessionCli upload jars into hdfs with default 3 > replications. From our production experience, we find that 3 replications > will block big job (256 containers) to launch, when the HDFS is slow due to > big workload for batch pipelines. Thus, we want to make the factor > customizable from FlinkYarnSessionCli by adding an option. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12374) Support translation from StreamExecTableSourceScan/BatchExecTableSourceScan to StreamTransformation.
Jing Zhang created FLINK-12374: -- Summary: Support translation from StreamExecTableSourceScan/BatchExecTableSourceScan to StreamTransformation. Key: FLINK-12374 URL: https://issues.apache.org/jira/browse/FLINK-12374 Project: Flink Issue Type: Task Components: Table SQL / API Reporter: Jing Zhang -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] lamber-ken commented on issue #8254: [FLINK-12219][runtime] Yarn application can't stop when flink job failed in per-job yarn cluste mode
lamber-ken commented on issue #8254: [FLINK-12219][runtime] Yarn application can't stop when flink job failed in per-job yarn cluste mode URL: https://github.com/apache/flink/pull/8254#issuecomment-487979406 ok,I agree with you This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on issue #8254: [FLINK-12219][runtime] Yarn application can't stop when flink job failed in per-job yarn cluste mode
tillrohrmann commented on issue #8254: [FLINK-12219][runtime] Yarn application can't stop when flink job failed in per-job yarn cluste mode URL: https://github.com/apache/flink/pull/8254#issuecomment-487977504 It is true that Yarn could try to restart the cluster depending on the configuration. Flink would then try to re-execute the job again since it is part of the Yarn application (no problem with the submitted job graph store). Depending on whether the Flink bug is transient or not the failure would happen again until all restarts are depleted or eventually the job will succeed (potentially with producing duplicate results). Given that this is caused by a bug in Flink, I think it is ok to say that we don't give hard guarantees in this case. The important bit is that we report the problem. I would open a PR based on my commit to add this utility. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-12343) Allow set file.replication in Yarn Configuration
[ https://issues.apache.org/jira/browse/FLINK-12343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16830343#comment-16830343 ] Till Rohrmann commented on FLINK-12343: --- I think the {{ResourceManager}} also sets up some local resources when it creates the {{TaskExecutorContext}}. I guess we should set the same replication factor for these files as well. A part from that, I think this idea should work. The important bit is to not define a default value for the Flink option so that we can fall back to the HDFS default. > Allow set file.replication in Yarn Configuration > > > Key: FLINK-12343 > URL: https://issues.apache.org/jira/browse/FLINK-12343 > Project: Flink > Issue Type: Improvement > Components: Command Line Client, Deployment / YARN >Affects Versions: 1.6.4, 1.7.2, 1.8.0 >Reporter: Zhenqiu Huang >Assignee: Zhenqiu Huang >Priority: Minor > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Currently, FlinkYarnSessionCli upload jars into hdfs with default 3 > replications. From our production experience, we find that 3 replications > will block big job (256 containers) to launch, when the HDFS is slow due to > big workload for batch pipelines. Thus, we want to make the factor > customizable from FlinkYarnSessionCli by adding an option. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] azagrebin commented on a change in pull request #8320: [FLINK-12201][network][metrics] Introduce InputGateWithMetrics in Task to increment numBytesIn metric
azagrebin commented on a change in pull request #8320: [FLINK-12201][network][metrics] Introduce InputGateWithMetrics in Task to increment numBytesIn metric URL: https://github.com/apache/flink/pull/8320#discussion_r279770711 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ## @@ -191,7 +191,7 @@ private final ResultPartition[] producedPartitions; - private final SingleInputGate[] inputGates; + private final InputGate[] inputGates; Review comment: I would keep it as `SingleInputGate[]` to avoid that copying loop `SingleInputGate[]` -> `InputGate[]` in Task constructor at the moment. Later, we could move `inputGatesById` completely to `NetworkEnviroment`, when we refactor: - `TaskExecutor.updatePartitions` to use `NetworkEnviroment.update(PartitionInfo)` - try to move `triggerPartitionProducerStateCheck` to `SingleInputGate` and pass `partitionProducerStateChecker` to `SingleInputGate` instead of having it in `TaskActions` `inputGatesById` looks like the last blocker to change `SingleInputGate[]` -> `InputGate[]` in Task. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8320: [FLINK-12201][network][metrics] Introduce InputGateWithMetrics in Task to increment numBytesIn metric
azagrebin commented on a change in pull request #8320: [FLINK-12201][network][metrics] Introduce InputGateWithMetrics in Task to increment numBytesIn metric URL: https://github.com/apache/flink/pull/8320#discussion_r279764419 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferOrEvent.java ## @@ -43,26 +46,30 @@ private int channelIndex; - BufferOrEvent(Buffer buffer, int channelIndex, boolean moreAvailable) { + public BufferOrEvent(Buffer buffer, int channelIndex, boolean moreAvailable, int size) { this.buffer = checkNotNull(buffer); this.event = null; this.channelIndex = channelIndex; this.moreAvailable = moreAvailable; + this.size = size; Review comment: `Buffer.getSizeUnsafe()` is already available? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-12343) Allow set file.replication in Yarn Configuration
[ https://issues.apache.org/jira/browse/FLINK-12343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16830330#comment-16830330 ] Zhenqiu Huang commented on FLINK-12343: --- [~till.rohrmann][~rmetzger] I think we can set the hdfs.replication in YarnConfiguration of AbstractYarnClusterDescriptor. As, this configuration is only used in client side, so will not impact the runtime file replications. The reason I initially choose to use the setReplication method is that our org will use S3 for long term to submit job to different cluster management system, I want to apply the replication to both hdfs/s3. But It looks S3AFileSystem doesn't implement the method. I think it is good to use hdfs.replication initially. How do you think? > Allow set file.replication in Yarn Configuration > > > Key: FLINK-12343 > URL: https://issues.apache.org/jira/browse/FLINK-12343 > Project: Flink > Issue Type: Improvement > Components: Command Line Client, Deployment / YARN >Affects Versions: 1.6.4, 1.7.2, 1.8.0 >Reporter: Zhenqiu Huang >Assignee: Zhenqiu Huang >Priority: Minor > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Currently, FlinkYarnSessionCli upload jars into hdfs with default 3 > replications. From our production experience, we find that 3 replications > will block big job (256 containers) to launch, when the HDFS is slow due to > big workload for batch pipelines. Thus, we want to make the factor > customizable from FlinkYarnSessionCli by adding an option. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] HuangZhenQiu commented on issue #8303: [FLINK-12343]add file replication config for yarn configuration
HuangZhenQiu commented on issue #8303: [FLINK-12343]add file replication config for yarn configuration URL: https://github.com/apache/flink/pull/8303#issuecomment-487964624 @rmetzger @tillrohrmann I think we can set the hdfs.replication in YarnConfiguration of AbstractYarnClusterDescriptor. As, this configuration is only used in client side, so will not impact the runtime file replications. The reason I initially choose to use the setReplication method is that our org will use S3 for long term to submit job to different cluster management system. It looks S3AFileSystem doesn't implement the method. I think it is good to use hdfs.replication initially. How do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] lamber-ken edited a comment on issue #8254: [FLINK-12219][runtime] Yarn application can't stop when flink job failed in per-job yarn cluste mode
lamber-ken edited a comment on issue #8254: [FLINK-12219][runtime] Yarn application can't stop when flink job failed in per-job yarn cluste mode URL: https://github.com/apache/flink/pull/8254#issuecomment-487951795 I have a question that if use `System.exit` to kill the JobManager,the Yarn ResourceManager will attempt the application again (attemp two times by default)? it's bad if yarn resourcemanager attempt again. Because the flink job failed and remove zk metadata,when attempt again, will cause no jobs to recover error This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] lamber-ken edited a comment on issue #8254: [FLINK-12219][runtime] Yarn application can't stop when flink job failed in per-job yarn cluste mode
lamber-ken edited a comment on issue #8254: [FLINK-12219][runtime] Yarn application can't stop when flink job failed in per-job yarn cluste mode URL: https://github.com/apache/flink/pull/8254#issuecomment-487951795 I have a question that if use `System.exit` to kill the JobManager,the Yarn ResourceManager will attempt the application again (attemp two times by default)? it's bad if yarn resourcemanager attempt again. Because the flink job failed and remove zk metadata,when attempt again, will cause no jobs to recover. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] lamber-ken edited a comment on issue #8254: [FLINK-12219][runtime] Yarn application can't stop when flink job failed in per-job yarn cluste mode
lamber-ken edited a comment on issue #8254: [FLINK-12219][runtime] Yarn application can't stop when flink job failed in per-job yarn cluste mode URL: https://github.com/apache/flink/pull/8254#issuecomment-487951795 I have a question that if use `System.exit` to kill the JobManager,the ResourceManager will attempt the application again? it's bad if yarn resourcemanager attempt again. Because the flink job failed and remove zk metadata,when attempt again, will cause no jobs to recover. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] lamber-ken commented on issue #8254: [FLINK-12219][runtime] Yarn application can't stop when flink job failed in per-job yarn cluste mode
lamber-ken commented on issue #8254: [FLINK-12219][runtime] Yarn application can't stop when flink job failed in per-job yarn cluste mode URL: https://github.com/apache/flink/pull/8254#issuecomment-487951795 but, another problem is I can't re-open this issue again This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] lamber-ken edited a comment on issue #8254: [FLINK-12219][runtime] Yarn application can't stop when flink job failed in per-job yarn cluste mode
lamber-ken edited a comment on issue #8254: [FLINK-12219][runtime] Yarn application can't stop when flink job failed in per-job yarn cluste mode URL: https://github.com/apache/flink/pull/8254#issuecomment-487950703 Your solution is fantastic @tillrohrmann! I known the reason,but I'm sorry that I didn't describe it well. `FutureUtils#assertNoException` is a good tool to solve this problem. My point is when flink job failed on yarn in detach mode,the yarn application should failed too. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] lamber-ken commented on issue #8254: [FLINK-12219][runtime] Yarn application can't stop when flink job failed in per-job yarn cluste mode
lamber-ken commented on issue #8254: [FLINK-12219][runtime] Yarn application can't stop when flink job failed in per-job yarn cluste mode URL: https://github.com/apache/flink/pull/8254#issuecomment-487950703 Your solution is fantastic @tillrohrmann! I known the reason,but I'm sorry that I didn't describe it well. `FutureUtils#assertNoException` is a good tool so solve this problem. My point is when flink job failed on yarn in detach mode,the yarn application should failed too. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-12184) HistoryServerArchiveFetcher isn't compatible with old version
[ https://issues.apache.org/jira/browse/FLINK-12184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann reassigned FLINK-12184: - Assignee: Congxian Qiu(klion26) > HistoryServerArchiveFetcher isn't compatible with old version > - > > Key: FLINK-12184 > URL: https://issues.apache.org/jira/browse/FLINK-12184 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.6.4, 1.7.2, 1.8.0 >Reporter: Yumeng Zhang >Assignee: Congxian Qiu(klion26) >Priority: Major > Labels: pull-request-available > Time Spent: 0.5h > Remaining Estimate: 0h > > If we have old verison json files in history server, the > HistoryServerArchiveFetcher can't convert legacy job overview. It will throw > the NullPointerException when trying to convert legacy job overview because > the tasks don't have the "pending" field. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] tillrohrmann commented on a change in pull request #8313: [FLINK-12184][Coordination]HistoryServerArchieFetcher incompatible with old version
tillrohrmann commented on a change in pull request #8313: [FLINK-12184][Coordination]HistoryServerArchieFetcher incompatible with old version URL: https://github.com/apache/flink/pull/8313#discussion_r279748455 ## File path: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java ## @@ -258,7 +269,13 @@ private static String convertLegacyJobOverview(String legacyOverview) throws IOE int[] tasksPerState = new int[ExecutionState.values().length]; // pending is a mix of CREATED/SCHEDULED/DEPLOYING // to maintain the correct number of task states we have to pick one of them - tasksPerState[ExecutionState.SCHEDULED.ordinal()] = pending; + if (versionLessThan14) { + tasksPerState[ExecutionState.SCHEDULED.ordinal()] = pending; + } else { + tasksPerState[ExecutionState.SCHEDULED.ordinal()] = scheduled; + tasksPerState[ExecutionState.CREATED.ordinal()] = created; + tasksPerState[ExecutionState.DEPLOYING.ordinal()] = deploying; + } Review comment: Do we need the `versionLessThan14` distinction here? We could say that we assign `scheduled = pendingNode.asInt()` in line 257 and get completely rid of `pending`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on issue #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on issue #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#issuecomment-487947096 @StefanRRichter This is the second step's PR of the checkpoint failure process improvement. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12364) Introduce a CheckpointFailureManager to centralized manage checkpoint failure
[ https://issues.apache.org/jira/browse/FLINK-12364?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-12364: --- Labels: pull-request-available (was: ) > Introduce a CheckpointFailureManager to centralized manage checkpoint failure > - > > Key: FLINK-12364 > URL: https://issues.apache.org/jira/browse/FLINK-12364 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing >Reporter: vinoyang >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > This issue tracks the work of T2 section about in design document : > [https://docs.google.com/document/d/1ce7RtecuTxcVUJlnU44hzcO2Dwq9g4Oyd8_biy94hJc/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] flinkbot commented on issue #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
flinkbot commented on issue #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#issuecomment-487944247 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua opened a new pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua opened a new pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322 ## What is the purpose of the change *This pull request introduces a CheckpointFailureManager to centralized manage checkpoint failure* ## Brief change log - *Introduce a CheckpointFailureManager to centralized manage checkpoint failure* - *Add a test case for CheckpointFailureManager* ## Verifying this change This change is already covered by existing tests, such as *CheckpointFailureManagerTest*. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / **not documented**) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10317) Configure Metaspace size by default
[ https://issues.apache.org/jira/browse/FLINK-10317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16830265#comment-16830265 ] Nico Kruber commented on FLINK-10317: - I'm actually unsure whether we should set a default now, It seems I was mistaken by thinking that GC is only triggered if {{MaxMetaspaceSize}} is set, but it seems that this is not true and indeed first GC starts with {{MetaspaceSize}} and then depending on the GC itself. Therefore, setting {{MaxMetaspaceSize}} would ease troubleshooting by failing earlier in case of class loader leaks but wouldn't solve anything on its own. > Configure Metaspace size by default > --- > > Key: FLINK-10317 > URL: https://issues.apache.org/jira/browse/FLINK-10317 > Project: Flink > Issue Type: Bug > Components: Deployment / Scripts >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Stephan Ewen >Priority: Major > Fix For: 1.7.3, 1.6.5 > > Attachments: Screen Shot 2019-01-07 at 10.08.39.png, Screen Shot > 2019-01-07 at 10.08.47.png, Screenshot 2018-12-18 at 12.14.11.png > > > We should set the size of the JVM Metaspace to a sane default, like > {{-XX:MaxMetaspaceSize=256m}}. > If not set, the JVM offheap memory will grow indefinitely with repeated > classloading and Jitting, eventually exceeding allowed memory on docker/yarn > or similar setups. > It is hard to come up with a good default, however, I believe the error > messages one gets when metaspace is too small are easy to understand (and > easy to take action), while it is very hard to figure out why the memory > footprint keeps growing steadily and infinitely. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] tillrohrmann commented on issue #8254: [FLINK-12219][runtime] Yarn application can't stop when flink job failed in per-job yarn cluste mode
tillrohrmann commented on issue #8254: [FLINK-12219][runtime] Yarn application can't stop when flink job failed in per-job yarn cluste mode URL: https://github.com/apache/flink/pull/8254#issuecomment-487940319 As I've said before, the archiving handlers belong to Flink and if they are broken, then the system does not give any guarantees. If they were user provided, then I would agree that Flink should not fail in case of exceptions because they are not under its control. The problem is that we don't see this exception. The reason seems to be that we execute the `jobReachedGloballyTerminalState` in a `whenCompleteAsync` callback which returns a future containing the exception. Since we don't check this future, we will never see it. That's also why the `UncaughtExceptionHandler` is never called. I would suggest to add a utility `FutureUtils#assertNoException` which checks returned futures for normal completion. Please take a look at this commit https://github.com/tillrohrmann/flink/commit/524ead44f68ff02d52e8ff561c9e235e5b353123 and let me know whether you would accept this as a fix for the problem. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Comment Edited] (FLINK-10317) Configure Metaspace size by default
[ https://issues.apache.org/jira/browse/FLINK-10317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16725939#comment-16725939 ] Nico Kruber edited comment on FLINK-10317 at 4/30/19 12:51 PM: --- -Actually limiting metaspace will also trigger its garbage collection (without limit it won't do GC for the metaspace!).-(GC is done once the initial metaspace size is hit and from then on the GC decides - see https://blogs.oracle.com/poonam/about-g1-garbage-collector,-permanent-generation-and-metaspace) Did you test this out with the metaspace limit? If setting metaspace correctly (please verify in the TM logs) does not help, then your issue should be continued in a separate thread like FLINK-9080 or FLINK-11205. was (Author: nicok): Actually limiting metaspace will also trigger its garbage collection (without limit it won't do GC for the metaspace!). Did you test this out with the metaspace limit? If setting metaspace correctly (please verify in the TM logs) does not help, then your issue should be continued in a separate thread like FLINK-9080 or FLINK-11205. > Configure Metaspace size by default > --- > > Key: FLINK-10317 > URL: https://issues.apache.org/jira/browse/FLINK-10317 > Project: Flink > Issue Type: Bug > Components: Deployment / Scripts >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Stephan Ewen >Priority: Major > Fix For: 1.7.3, 1.6.5 > > Attachments: Screen Shot 2019-01-07 at 10.08.39.png, Screen Shot > 2019-01-07 at 10.08.47.png, Screenshot 2018-12-18 at 12.14.11.png > > > We should set the size of the JVM Metaspace to a sane default, like > {{-XX:MaxMetaspaceSize=256m}}. > If not set, the JVM offheap memory will grow indefinitely with repeated > classloading and Jitting, eventually exceeding allowed memory on docker/yarn > or similar setups. > It is hard to come up with a good default, however, I believe the error > messages one gets when metaspace is too small are easy to understand (and > easy to take action), while it is very hard to figure out why the memory > footprint keeps growing steadily and infinitely. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] hequn8128 commented on issue #8230: [FLINK-10977][table] Add streaming non-window FlatAggregate to Table API
hequn8128 commented on issue #8230: [FLINK-10977][table] Add streaming non-window FlatAggregate to Table API URL: https://github.com/apache/flink/pull/8230#issuecomment-487938937 @sunjincheng121 Thank you very much for your continues review and suggestions. I have addressed all your comments and updated the PR. :-) Best, Hequn This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #8321: [FLINK-12351][DataStream] Fix AsyncWaitOperator to deep copy StreamElement when object reuse is enabled
flinkbot commented on issue #8321: [FLINK-12351][DataStream] Fix AsyncWaitOperator to deep copy StreamElement when object reuse is enabled URL: https://github.com/apache/flink/pull/8321#issuecomment-487935686 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services