[jira] [Commented] (FLINK-10460) DataDog reporter JsonMappingException
[ https://issues.apache.org/jira/browse/FLINK-10460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16750837#comment-16750837 ] lining commented on FLINK-10460: [~phoenixjiangnan] see code org.apache.flink.metrics.datadog.DatadogHttpReporter#report, as has exception will remove the key, will cause java.util.ConcurrentModificationException. > DataDog reporter JsonMappingException > - > > Key: FLINK-10460 > URL: https://issues.apache.org/jira/browse/FLINK-10460 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.4.2 >Reporter: Elias Levy >Priority: Minor > > Observed the following error in the TM logs this morning: > {code:java} > WARN org.apache.flink.metrics.datadog.DatadogHttpReporter - Failed > reporting metrics to Datadog. > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException: > (was java.util.ConcurrentModificationException) (through reference chain: > org.apache.flink.metrics.datadog.DSeries["series"]-> > java.util.ArrayList[88]->org.apache.flink.metrics.datadog.DGauge["points"]) > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:379) >at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:339) > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer.wrapAndThrow(StdSerializer.java:342) >at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:686) >at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:157) >at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.impl.IndexedListSerializer.serializeContents(IndexedListSerializer.java:119) >at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.impl.IndexedListSerializer.serialize(IndexedListSerializer.java:79) > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.impl.IndexedListSerializer.serialize(IndexedListSerializer.java:18) > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:672) > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:678) > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:157) >at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:130) >at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._configAndWriteValue(ObjectMapper.java:3631) >at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.writeValueAsString(ObjectMapper.java:2998) >at > org.apache.flink.metrics.datadog.DatadogHttpClient.serialize(DatadogHttpClient.java:90) >at > org.apache.flink.metrics.datadog.DatadogHttpClient.send(DatadogHttpClient.java:79) >at > org.apache.flink.metrics.datadog.DatadogHttpReporter.report(DatadogHttpReporter.java:143) > at > org.apache.flink.runtime.metrics.MetricRegistryImpl$ReporterTask.run(MetricRegistryImpl.java:417) >at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) >at java.util.concurrent.FutureTask.runAndReset(Unknown Source) >at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(Unknown > Source) >at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown > Source) >at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) >at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) >at java.lang.Thread.run(Unknown Source) > Caused by: java.util.ConcurrentModificationException >at java.util.LinkedHashMap$LinkedHashIterator.nextNode(Unknown Source) >at java.util.LinkedHashMap$LinkedKeyIterator.next(Unknown Source) >at java.util.AbstractCollection.addAll(Unknown Source) >at java.util.HashSet.(Unknown Source) >at > org.apache.kafka.common.internals.PartitionStates.partitionSet(PartitionStates.java:65) >at > org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedPartitions(SubscriptionState.java:298) >at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$ConsumerCoordinatorMetrics$1.measure(ConsumerCoordinator.java:906) >
[jira] [Closed] (FLINK-11356) Check and port JobManagerStartupTest to new code base if necessary
[ https://issues.apache.org/jira/browse/FLINK-11356?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-11356. - Resolution: Fixed Fix Version/s: 1.8.0 Fixed via bb64862b8b7e3ee35625bfe8cb003e8eada8e949 > Check and port JobManagerStartupTest to new code base if necessary > -- > > Key: FLINK-11356 > URL: https://issues.apache.org/jira/browse/FLINK-11356 > Project: Flink > Issue Type: Sub-task > Components: Tests >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > Time Spent: 10m > Remaining Estimate: 0h > > Check and port {{JobManagerStartupTest}} to new code base if necessary. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tillrohrmann closed pull request #7541: [FLINK-11356][tests] Port JobManagerStartupTest to new code base
tillrohrmann closed pull request #7541: [FLINK-11356][tests] Port JobManagerStartupTest to new code base URL: https://github.com/apache/flink/pull/7541 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #7526: [FLINK-11353][tests] Port JobManagerHAJobGraphRecoveryITCase to new code base
tillrohrmann commented on a change in pull request #7526: [FLINK-11353][tests] Port JobManagerHAJobGraphRecoveryITCase to new code base URL: https://github.com/apache/flink/pull/7526#discussion_r250494997 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java ## @@ -225,31 +206,82 @@ public static JobGraph createNonEmptyJobGraph() { return jobGraph; } - private static class HATestingDispatcher extends TestingDispatcher { + @Nonnull + private HATestingDispatcher createDispatcher(TestingHighAvailabilityServices highAvailabilityServices, Queue fencingTokens) throws Exception { + return createDispatcher(highAvailabilityServices, fencingTokens, new TestingJobManagerRunnerFactory(new CompletableFuture<>(), new CompletableFuture<>(), CompletableFuture.completedFuture(null))); + } - @Nonnull - private final BlockingQueue fencingTokens; + @Nonnull + private HATestingDispatcher createDispatcher( + TestingHighAvailabilityServices highAvailabilityServices, + @Nullable Queue fencingTokens, + Dispatcher.JobManagerRunnerFactory jobManagerRunnerFactory) throws Exception { + final Configuration configuration = new Configuration(); - HATestingDispatcher(RpcService rpcService, String endpointId, Configuration configuration, HighAvailabilityServices highAvailabilityServices, ResourceManagerGateway resourceManagerGateway, BlobServer blobServer, HeartbeatServices heartbeatServices, JobManagerMetricGroup jobManagerMetricGroup, @Nullable String metricQueryServicePath, ArchivedExecutionGraphStore archivedExecutionGraphStore, JobManagerRunnerFactory jobManagerRunnerFactory, FatalErrorHandler fatalErrorHandler, @Nonnull BlockingQueue fencingTokens) throws Exception { - super(rpcService, endpointId, configuration, highAvailabilityServices, resourceManagerGateway, blobServer, heartbeatServices, jobManagerMetricGroup, metricQueryServicePath, archivedExecutionGraphStore, jobManagerRunnerFactory, fatalErrorHandler); - this.fencingTokens = fencingTokens; - } + return new HATestingDispatcher( + rpcService, + UUID.randomUUID().toString(), + configuration, + highAvailabilityServices, + new TestingResourceManagerGateway(), + new BlobServer(configuration, new VoidBlobStore()), + new HeartbeatServices(1000L, 1000L), + UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(), + null, + new MemoryArchivedExecutionGraphStore(), + jobManagerRunnerFactory, + testingFatalErrorHandler, + fencingTokens); + } - @VisibleForTesting - CompletableFuture getNumberJobs(Time timeout) { - return callAsyncWithoutFencing( - () -> listJobs(timeout).get().size(), - timeout); + private static class HATestingDispatcher extends TestingDispatcher { + + @Nullable + private final Queue fencingTokens; + + HATestingDispatcher( + RpcService rpcService, + String endpointId, + Configuration configuration, + HighAvailabilityServices highAvailabilityServices, + ResourceManagerGateway resourceManagerGateway, + BlobServer blobServer, + HeartbeatServices heartbeatServices, + JobManagerMetricGroup jobManagerMetricGroup, + @Nullable String metricQueryServicePath, + ArchivedExecutionGraphStore archivedExecutionGraphStore, + JobManagerRunnerFactory jobManagerRunnerFactory, + FatalErrorHandler fatalErrorHandler, + @Nullable Queue fencingTokens) throws Exception { + super( + rpcService, + endpointId, + configuration, + highAvailabilityServices, + resourceManagerGateway, + blobServer, + heartbeatServices, + jobManagerMetricGroup, + metricQueryServicePath, + archivedExecutionGraphStore, +
[jira] [Commented] (FLINK-10477) Add rescale button to web interface
[ https://issues.apache.org/jira/browse/FLINK-10477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16750818#comment-16750818 ] lining commented on FLINK-10477: Hi, [~yanghua]. How it is going? > Add rescale button to web interface > --- > > Key: FLINK-10477 > URL: https://issues.apache.org/jira/browse/FLINK-10477 > Project: Flink > Issue Type: Improvement > Components: Webfrontend >Reporter: Sander Ploegsma >Assignee: vinoyang >Priority: Minor > > Instead of having to use the REST API to rescale a running job, it would be > much easier if we were able to rescale a job from the web interface. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] libenchao opened a new pull request #7572: fix typo in JobMaster
libenchao opened a new pull request #7572: fix typo in JobMaster URL: https://github.com/apache/flink/pull/7572 Fix typo in JobMaster This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10707) Improve Cluster Overview in Flink Dashboard
[ https://issues.apache.org/jira/browse/FLINK-10707?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16750811#comment-16750811 ] lining commented on FLINK-10707: Hi, [~wolli]. How is it going? > Improve Cluster Overview in Flink Dashboard > --- > > Key: FLINK-10707 > URL: https://issues.apache.org/jira/browse/FLINK-10707 > Project: Flink > Issue Type: Sub-task > Components: Webfrontend >Reporter: Fabian Wollert >Assignee: Fabian Wollert >Priority: Major > Fix For: 1.8.0 > > Attachments: flink-dashboard.png > > > The flink Dashboard is currently very simple. The following screenshot is a > mock of an improvement proposal: > !flink-dashboard.png|width=806,height=401! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] pnowojski commented on a change in pull request #7570: [FLINK-11422] Prefer testing class to mock StreamTask in AbstractStre…
pnowojski commented on a change in pull request #7570: [FLINK-11422] Prefer testing class to mock StreamTask in AbstractStre… URL: https://github.com/apache/flink/pull/7570#discussion_r250487400 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java ## @@ -636,4 +597,81 @@ public void close() { // ignore } } + + private class MockStreamTask extends StreamTask { Review comment: Can you try to convert this class to public non inner class? This class can definitely evolve into something that's re-used across multiple tests, since this is not the only place that's using `mock(StreamTask.class)`. I think it's easy to do with "refactor" options in Intellij: 1. refactor -> convert to static class (to generate a constructor for passing all of the fields) 2. refactor -> move (to move from inner class to outer level) I guess the only "problematic" thing to solve is `streamTaskStateInitializer` field, which is non final. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-11423) Propagate the error message from Main method to JarRunHandler
Lavkesh Lahngir created FLINK-11423: --- Summary: Propagate the error message from Main method to JarRunHandler Key: FLINK-11423 URL: https://issues.apache.org/jira/browse/FLINK-11423 Project: Flink Issue Type: Improvement Components: Job-Submission Reporter: Lavkesh Lahngir Assignee: Lavkesh Lahngir The jar/run API calls JarRunHandler The client only receives a simple message like "The main method caused an error" without any more detail. When we throw ProgramInvocationException in PackagedProgram.callMainMethod() we should add exceptionInMethod.getMessage() too. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10724) Refactor failure handling in check point coordinator
[ https://issues.apache.org/jira/browse/FLINK-10724?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-10724: --- Labels: pull-request-available (was: ) > Refactor failure handling in check point coordinator > > > Key: FLINK-10724 > URL: https://issues.apache.org/jira/browse/FLINK-10724 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Andrey Zagrebin >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > At the moment failure handling of asynchronously triggered checkpoint in > check point coordinator happens in different places. We could organise it > similar way as failure handling of synchronous triggering of checkpoint in > *CheckpointTriggerResult* where we classify error cases. This will simplify > e.g. integration of error counter for FLINK-4810. > See also discussion here: [https://github.com/apache/flink/pull/6567] > The specific design document : > https://docs.google.com/document/d/1ce7RtecuTxcVUJlnU44hzcO2Dwq9g4Oyd8_biy94hJc/edit?usp=sharing -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua opened a new pull request #7571: [FLINK-10724] Refactor failure handling in check point coordinator
yanghua opened a new pull request #7571: [FLINK-10724] Refactor failure handling in check point coordinator URL: https://github.com/apache/flink/pull/7571 ## What is the purpose of the change *This pull request refactored failure handling in checkpoint coordinator and reimplemented the `failOnCheckpointingErrors` in checkpoint coordinator* ## Brief change log - *Refactor PendingCheckpoint#abortXXX* - *reimplemented the `failOnCheckpointingErrors` in checkpoint coordinator* ## Verifying this change This change is already covered by existing tests, such as *CheckpointCoordinatorTest#testTriggerAndDeclineWithExecutionFailure* and *CheckpointCoordinatorTest#testTriggerAndDeclineSimpleSavepointWithExecutionFailure*. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / **not documented**) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhijiangW commented on a change in pull request #7199: [FLINK-10662][network] Refactor the ChannelSelector interface for single selected channel
zhijiangW commented on a change in pull request #7199: [FLINK-10662][network] Refactor the ChannelSelector interface for single selected channel URL: https://github.com/apache/flink/pull/7199#discussion_r250484212 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java ## @@ -68,20 +68,29 @@ private final boolean flushAlways; + private final boolean isBroadcastSelector; + private Counter numBytesOut = new SimpleCounter(); private Counter numBuffersOut = new SimpleCounter(); public RecordWriter(ResultPartitionWriter writer) { - this(writer, new RoundRobinChannelSelector()); + this(writer, new RoundRobinChannelSelector(), false); } - @SuppressWarnings("unchecked") - public RecordWriter(ResultPartitionWriter writer, ChannelSelector channelSelector) { - this(writer, channelSelector, false); + public RecordWriter( Review comment: I am considering the way for solving this issue. I agree it seems better to introduce either a builder method or `RecordWriterBuilder` class for creating the specific RecordWriter. But it seems unfeasible to check ChannelSelector instance in current architecture, because the classes of `BatchTask` and `StreamTask` are in different modules and the` instanceof BroadcastPartitioner` check would only work under streaming module, not meanwhile work for `BatchTask` in runtime module. There might be two ways for this issue: 1. Introduce `ChannelSelector#isBroadcast` method to solve the module dependence. 2. Check the broadcast mode separately in `StreamTask` and `BatchTask` classes, that means no unified builder for creating `RecordWriter` I prefer the second way a bit in order not to change the interface much, although this check instance would be scattered in two different places. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] klion26 commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread
klion26 commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread URL: https://github.com/apache/flink/pull/7351#discussion_r250477821 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java ## @@ -36,43 +36,47 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -import static org.apache.flink.runtime.concurrent.Executors.newDirectExecutorService; /** - * Data transfer utils for {@link RocksDBKeyedStateBackend}. + * Help class for downloading RocksDBState. */ -class RocksDbStateDataTransfer { +public class RocksDBStateDownloader extends RocksDBStateDataTransfer { + public RocksDBStateDownloader(int restoringThreadNum) { + super(restoringThreadNum); + } - static void transferAllStateDataToDirectory( + /** +* Transfer all state data to the target directory using specified number of threads. +* +* @param restoreStateHandle Handles used to retrieve the state data. +* @param dest The target directory which the state data will be stored. +* @param closeableRegistry Which all the inputStream/outputStream will be registered and unregistered. +* +* @throws Exception Thrown if can not transfer all the state data. +*/ + public void transferAllStateDataToDirectory( IncrementalKeyedStateHandle restoreStateHandle, Path dest, - int restoringThreadNum, CloseableRegistry closeableRegistry) throws Exception { final Map sstFiles = restoreStateHandle.getSharedState(); final Map miscFiles = restoreStateHandle.getPrivateState(); - downloadDataForAllStateHandles(sstFiles, dest, restoringThreadNum, closeableRegistry); - downloadDataForAllStateHandles(miscFiles, dest, restoringThreadNum, closeableRegistry); + downloadDataForAllStateHandles(sstFiles, dest, closeableRegistry); Review comment: @azagrebin We can not share `closeableRegistry` because of supportting parallel snapshot. If we share the `closeableRegistry` when parallel snapshot, the later complete snapshot will come into an Exception `IOException("Cannot register Closeable, registry is already closed. Closing argument.")`(The `closeableRegistry` was close in `AsyncSnapshotCallable#closeSnapshotIO`) when registering the input/outputstream to the registry, such as the [Travis log said](https://travis-ci.org/apache/flink/jobs/483728783). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] klion26 edited a comment on issue #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread
klion26 edited a comment on issue #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread URL: https://github.com/apache/flink/pull/7351#issuecomment-457074711 Seems like there is a relevant test failed after reusing the registry, I'm looking into it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] klion26 commented on issue #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread
klion26 commented on issue #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread URL: https://github.com/apache/flink/pull/7351#issuecomment-457074711 Seems like there is a relevant test failed, I'm looking into it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] alexeyt820 commented on a change in pull request #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers
alexeyt820 commented on a change in pull request #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers URL: https://github.com/apache/flink/pull/6615#discussion_r250467958 ## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java ## @@ -55,4 +57,13 @@ * @return null or the target topic */ String getTargetTopic(T element); + + /** +* +* @param element The incoming element to be serialized +* @return collection of headers (maybe empty) +*/ + default Iterable> headers(T element) { Review comment: @azagrebin, so you suggest to have Flink class to represent Kafka Header instead of Map.Entry? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] alexeyt820 commented on a change in pull request #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers
alexeyt820 commented on a change in pull request #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers URL: https://github.com/apache/flink/pull/6615#discussion_r250457396 ## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java ## @@ -55,4 +57,13 @@ * @return null or the target topic */ String getTargetTopic(T element); + + /** +* +* @param element The incoming element to be serialized +* @return collection of headers (maybe empty) +*/ + default Iterable> headers(T element) { Review comment: Can't have default method creating `ProducerRecord`, because 0.8 doesn't have timestamp parameter in ProducerRecord constructor This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-11397) Speed up initialization of AbstractStreamOperatorTestHarness
[ https://issues.apache.org/jira/browse/FLINK-11397?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] TisonKun reassigned FLINK-11397: Assignee: (was: TisonKun) > Speed up initialization of AbstractStreamOperatorTestHarness > > > Key: FLINK-11397 > URL: https://issues.apache.org/jira/browse/FLINK-11397 > Project: Flink > Issue Type: Improvement > Components: Tests >Affects Versions: 1.8.0 >Reporter: TisonKun >Priority: Major > > Currently Kafka connector tests are unbearably slow, which is tracked by > FLINK-10603. With investigation, the construction and initialization of > {{AbstractStreamOperatorTestHarness}} is quite slow. > -When walk down the code, it amazed me that {{mockTask = > mock(StreamTask.class);}} cost a few of second to finish. If we can introduce > a test class instead of mock framework, the situation might be loosen.- > cc [~Zentol] [~pnowojski] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11422) Prefer testing class to mock StreamTask in AbstractStreamOperatorTestHarness
[ https://issues.apache.org/jira/browse/FLINK-11422?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-11422: --- Labels: pull-request-available (was: ) > Prefer testing class to mock StreamTask in AbstractStreamOperatorTestHarness > > > Key: FLINK-11422 > URL: https://issues.apache.org/jira/browse/FLINK-11422 > Project: Flink > Issue Type: Improvement > Components: Tests >Affects Versions: 1.8.0 >Reporter: TisonKun >Assignee: TisonKun >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] TisonKun opened a new pull request #7570: [FLINK-11422] Prefer testing class to mock StreamTask in AbstractStre…
TisonKun opened a new pull request #7570: [FLINK-11422] Prefer testing class to mock StreamTask in AbstractStre… URL: https://github.com/apache/flink/pull/7570 …amOperatorTestHarness ## What is the purpose of the change As title, we prefer using a testing class to a mockito mock. ## Brief change log Introduce an inner class `MockStreamTask` and replace mock StreamTask with it. ## Verifying this change This change is already covered by existing tests that using StreamOperatorTestHarness. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive):(no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) cc @pnowojski This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] klion26 commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread
klion26 commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread URL: https://github.com/apache/flink/pull/7351#discussion_r250456369 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploader.java ## @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.CheckpointedStateScope; +import org.apache.flink.runtime.state.StateHandleID; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.function.CheckedSupplier; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.function.Supplier; + +/** + * Help class for uploading RocksDB state. + */ +public class RocksDBStateUploader extends RocksDBStateDownloader { Review comment: My bad, I chose the wrong one when using auto-complete. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-11422) Prefer testing class to mock StreamTask in AbstractStreamOperatorTestHarness
TisonKun created FLINK-11422: Summary: Prefer testing class to mock StreamTask in AbstractStreamOperatorTestHarness Key: FLINK-11422 URL: https://issues.apache.org/jira/browse/FLINK-11422 Project: Flink Issue Type: Improvement Components: Tests Affects Versions: 1.8.0 Reporter: TisonKun Assignee: TisonKun Fix For: 1.8.0 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11397) Speed up initialization of AbstractStreamOperatorTestHarness
[ https://issues.apache.org/jira/browse/FLINK-11397?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16750680#comment-16750680 ] TisonKun commented on FLINK-11397: -- [~pnowojski] I find such "cause" when running a debugger step by step. However, when I replacing the mock class with a testing class, the total test time isn't reduced. I'm sorry for mistaking the cause. As getting rid of mockito mock, I would open another Jira to tracking it. > Speed up initialization of AbstractStreamOperatorTestHarness > > > Key: FLINK-11397 > URL: https://issues.apache.org/jira/browse/FLINK-11397 > Project: Flink > Issue Type: Improvement > Components: Tests >Affects Versions: 1.8.0 >Reporter: TisonKun >Priority: Major > > Currently Kafka connector tests are unbearably slow, which is tracked by > FLINK-10603. With investigation, the construction and initialization of > {{AbstractStreamOperatorTestHarness}} is quite slow. > -When walk down the code, it amazed me that {{mockTask = > mock(StreamTask.class);}} cost a few of second to finish. If we can introduce > a test class instead of mock framework, the situation might be loosen.- > cc [~Zentol] [~pnowojski] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11397) Speed up initialization of AbstractStreamOperatorTestHarness
[ https://issues.apache.org/jira/browse/FLINK-11397?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] TisonKun updated FLINK-11397: - Description: Currently Kafka connector tests are unbearably slow, which is tracked by FLINK-10603. With investigation, the construction and initialization of {{AbstractStreamOperatorTestHarness}} is quite slow. -When walk down the code, it amazed me that {{mockTask = mock(StreamTask.class);}} cost a few of second to finish. If we can introduce a test class instead of mock framework, the situation might be loosen.- cc [~Zentol] [~pnowojski] was: Currently Kafka connector tests are unbearably slow, which is tracked by FLINK-10603. With investigation, the construction and initialization of {{AbstractStreamOperatorTestHarness}} is quite slow. When walk down the code, it amazed me that {{mockTask = mock(StreamTask.class);}} cost a few of second to finish. If we can introduce a test class instead of mock framework, the situation might be loosen. cc [~Zentol] [~pnowojski] > Speed up initialization of AbstractStreamOperatorTestHarness > > > Key: FLINK-11397 > URL: https://issues.apache.org/jira/browse/FLINK-11397 > Project: Flink > Issue Type: Improvement > Components: Tests >Affects Versions: 1.8.0 >Reporter: TisonKun >Assignee: TisonKun >Priority: Major > > Currently Kafka connector tests are unbearably slow, which is tracked by > FLINK-10603. With investigation, the construction and initialization of > {{AbstractStreamOperatorTestHarness}} is quite slow. > -When walk down the code, it amazed me that {{mockTask = > mock(StreamTask.class);}} cost a few of second to finish. If we can introduce > a test class instead of mock framework, the situation might be loosen.- > cc [~Zentol] [~pnowojski] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7608) LatencyGauge change to histogram metric
[ https://issues.apache.org/jira/browse/FLINK-7608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16750675#comment-16750675 ] Shuyi Chen commented on FLINK-7608: --- [~Zentol], [~aljoscha], it appears that after this PR, I can no longer find the latency histogram from the web UI or the rest API. Is it because now it's grouped by operator id, and not vertex id? Is there a way that I can find this metric through the web UI or rest API? Thanks a lot. > LatencyGauge change to histogram metric > > > Key: FLINK-7608 > URL: https://issues.apache.org/jira/browse/FLINK-7608 > Project: Flink > Issue Type: Improvement > Components: Metrics >Reporter: Hai Zhou >Assignee: Hai Zhou >Priority: Major > Fix For: 1.5.0 > > > I used slf4jReporter[https://issues.apache.org/jira/browse/FLINK-4831] to > export metrics the log file. > I found: > {noformat} > -- Gauges > - > .. > zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming > Job.Map.0.latency: > value={LatencySourceDescriptor{vertexID=1, subtaskIndex=-1}={p99=116.0, > p50=59.5, min=11.0, max=116.0, p95=116.0, mean=61.836}} > zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming > Job.Sink- Unnamed.0.latency: > value={LatencySourceDescriptor{vertexID=1, subtaskIndex=0}={p99=195.0, > p50=163.5, min=115.0, max=195.0, p95=195.0, mean=161.0}} > .. > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] alexeyt820 commented on a change in pull request #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers
alexeyt820 commented on a change in pull request #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers URL: https://github.com/apache/flink/pull/6615#discussion_r250453890 ## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java ## @@ -32,6 +34,49 @@ */ @PublicEvolving public interface KeyedDeserializationSchema extends Serializable, ResultTypeQueryable { + /** +* Kafka record to be deserialized. +* Record consists of key,value pair, topic name, partition offset, headers and a timestamp (if available) +*/ + interface Record { Review comment: Done in 2adbacb This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] alexeyt820 commented on a change in pull request #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers
alexeyt820 commented on a change in pull request #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers URL: https://github.com/apache/flink/pull/6615#discussion_r250453715 ## File path: flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka011ConsumerRecord.java ## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internal; + +import org.apache.flink.shaded.guava18.com.google.common.base.Function; +import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.header.Header; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.AbstractMap; +import java.util.Map; + +/** + * Extends base Kafka09ConsumerRecord to provide access to Kafka headers. + */ +class Kafka011ConsumerRecord extends Kafka09ConsumerRecord { + /** +* Wraps {@link Header} as Map.Entry. +*/ + private static final Function> HEADER_TO_MAP_ENTRY_FUNCTION = + new Function>() { + @Nonnull + @Override + public Map.Entry apply(@Nullable Header header) { + return new AbstractMap.SimpleImmutableEntry<>(header.key(), header.value()); + } + }; + + Kafka011ConsumerRecord(ConsumerRecord consumerRecord) { + super(consumerRecord); + } + + @Override + public Iterable> headers() { + return Iterables.transform(consumerRecord.headers(), HEADER_TO_MAP_ENTRY_FUNCTION); Review comment: Resolved (file removed) in 2adbacb This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] alexeyt820 commented on a change in pull request #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers
alexeyt820 commented on a change in pull request #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers URL: https://github.com/apache/flink/pull/6615#discussion_r250453628 ## File path: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java ## @@ -370,8 +370,8 @@ else if (partitionsRemoved) { keyPayload.get(keyBytes); } - final T value = deserializer.deserialize(keyBytes, valueBytes, - currentPartition.getTopic(), currentPartition.getPartition(), offset); + final T value = deserializer.deserialize( Review comment: Done in 2adbacb This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] alexeyt820 commented on issue #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers
alexeyt820 commented on issue #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers URL: https://github.com/apache/flink/pull/6615#issuecomment-457051271 The compile problem was due master flink-connector-kafka-base uses Kafka 0.10.2.1, where ConsumerRecord constructor has different signature from Kafka 0.8, which was used flink-connector-kafka-base when I branched PR branch. To be signature independent, changed test to use mock instead for creating ConsumerRecord via constructor. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-11301) Travis failed: No output has been received in the last 10m0s
[ https://issues.apache.org/jira/browse/FLINK-11301?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hequn Cheng updated FLINK-11301: Attachment: error.log > Travis failed: No output has been received in the last 10m0s > > > Key: FLINK-11301 > URL: https://issues.apache.org/jira/browse/FLINK-11301 > Project: Flink > Issue Type: Bug > Components: Travis >Reporter: Hequn Cheng >Priority: Major > Attachments: error.log > > > -[https://api.travis-ci.org/v3/job/47082/log.txt]- > new log link: > [https://api.travis-ci.org/v3/job/483359744/log.txt|https://api.travis-ci.org/v3/job/483359744/log.txt] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11301) Travis failed: No output has been received in the last 10m0s
[ https://issues.apache.org/jira/browse/FLINK-11301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16750659#comment-16750659 ] Hequn Cheng commented on FLINK-11301: - I encountered the same error in my last PR. Log link has been updated in the Description. > Travis failed: No output has been received in the last 10m0s > > > Key: FLINK-11301 > URL: https://issues.apache.org/jira/browse/FLINK-11301 > Project: Flink > Issue Type: Bug > Components: Travis >Reporter: Hequn Cheng >Priority: Major > > -[https://api.travis-ci.org/v3/job/47082/log.txt]- > new log link: > [https://api.travis-ci.org/v3/job/483359744/log.txt|https://api.travis-ci.org/v3/job/483359744/log.txt] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11301) Travis failed: No output has been received in the last 10m0s
[ https://issues.apache.org/jira/browse/FLINK-11301?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hequn Cheng updated FLINK-11301: Description: -[https://api.travis-ci.org/v3/job/47082/log.txt]- new log link: [https://api.travis-ci.org/v3/job/483359744/log.txt|https://api.travis-ci.org/v3/job/483359744/log.txt] was:https://api.travis-ci.org/v3/job/47082/log.txt > Travis failed: No output has been received in the last 10m0s > > > Key: FLINK-11301 > URL: https://issues.apache.org/jira/browse/FLINK-11301 > Project: Flink > Issue Type: Bug > Components: Travis >Reporter: Hequn Cheng >Priority: Major > > -[https://api.travis-ci.org/v3/job/47082/log.txt]- > new log link: > [https://api.travis-ci.org/v3/job/483359744/log.txt|https://api.travis-ci.org/v3/job/483359744/log.txt] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Reopened] (FLINK-11301) Travis failed: No output has been received in the last 10m0s
[ https://issues.apache.org/jira/browse/FLINK-11301?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hequn Cheng reopened FLINK-11301: - > Travis failed: No output has been received in the last 10m0s > > > Key: FLINK-11301 > URL: https://issues.apache.org/jira/browse/FLINK-11301 > Project: Flink > Issue Type: Bug > Components: Travis >Reporter: Hequn Cheng >Priority: Major > > https://api.travis-ci.org/v3/job/47082/log.txt -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11421) Providing more compilation options for code-generated operators
Liya Fan created FLINK-11421: Summary: Providing more compilation options for code-generated operators Key: FLINK-11421 URL: https://issues.apache.org/jira/browse/FLINK-11421 Project: Flink Issue Type: New Feature Components: Core Reporter: Liya Fan Assignee: Liya Fan Flink supports some operators (like Calc, Hash Agg, Hash Join, etc.) by code generation. That is, Flink generates their source code dynamically, and then compile it into Java Byte Code, which is load and executed at runtime. By default, Flink compiles the generated source code by Janino. This is fast, as the compilation often finishes in hundreds of milliseconds. The generated Java Byte Code, however, is of poor quality. To illustrate, we use Java Compiler API (JCA) to compile the generated code. Experiments on TPC-H (1 TB) queries show that the E2E time can be more than 10% shorter, when operators are compiled by JCA, despite that it takes more time (a few seconds) to compile with JCA. Therefore, we believe it is beneficial to compile generated code by JCA in the following scenarios: 1) For batch jobs, the E2E time is relatively long, so it is worth of spending more time compiling and generating high quality Java Byte Code. 2) For repeated stream jobs, the generated code will be compiled once and run many times. Therefore, it pays to spend more time compiling for the first time, and enjoy the high byte code qualities for later runs. According to the above observations, we want to provide a compilation option (Janino, JCA, or dynamic) for Flink, so that the user can choose the one suitable for their specific scenario and obtain better performance whenever possible. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11416) DISTINCT on a JOIN inside of an UNION is not working
[ https://issues.apache.org/jira/browse/FLINK-11416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16750611#comment-16750611 ] Hequn Cheng commented on FLINK-11416: - Hi [~lordon], Thanks for reporting the issue. It is probably a bug in Calcite. I have created a Calcite issue [here|https://issues.apache.org/jira/browse/CALCITE-2801]. I wrote the reason in it. As a workaround, you can remove the {{.distinct()}} after {{.select("c")}}. The result should be right as union will also perform distinct for you(union equals unionAll+distinct). > DISTINCT on a JOIN inside of an UNION is not working > > > Key: FLINK-11416 > URL: https://issues.apache.org/jira/browse/FLINK-11416 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Affects Versions: 1.7.0, 1.7.1 >Reporter: Elias Saalmann >Priority: Minor > > I get an error (Error while applying rule AggregateUnionAggregateRule) when > having a DISTINCT on a result of a JOIN within an UNION, e.g. > ( > SELECT DISTINCT c > FROM a JOIN b ON a = b > ) > UNION > ( > SELECT c > FROM c > ) > Full stacktrace: > Exception in thread "main" java.lang.RuntimeException: Error while applying > rule AggregateUnionAggregateRule, args > [rel#197:LogicalAggregate.NONE(input=rel#196:Subset#21.NONE,group=\{0}), > rel#194:LogicalUnion.NONE(input#0=rel#188:Subset#18.NONE,input#1=rel#189:Subset#19.NONE,all=true), > rel#221:LogicalAggregate.NONE(input=rel#184:Subset#16.NONE,group=\{2}), > rel#164:LogicalTableScan.NONE(table=[_DataSetTable_2])] > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:236) > > at > org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:646) > > at > org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:339) > at > org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:373) > > at > org.apache.flink.table.api.TableEnvironment.optimizeLogicalPlan(TableEnvironment.scala:292) > > at > org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:455) > > at > org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:475) > > at > org.apache.flink.table.api.java.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:165) > > at org.myorg.quickstart.TableJob1.main(TableJob1.java:51) > Caused by: java.lang.IllegalArgumentException: Cannot compute compatible row > type for arguments to set op: RecordType(VARCHAR(65536) a, VARCHAR(65536) b, > VARCHAR(65536) c), RecordType(VARCHAR(65536) d) > at org.apache.calcite.rel.core.SetOp.deriveRowType(SetOp.java:111) > at > org.apache.calcite.rel.AbstractRelNode.getRowType(AbstractRelNode.java:222) > at org.apache.calcite.tools.RelBuilder$Frame.(RelBuilder.java:2065) > at org.apache.calcite.tools.RelBuilder$Frame.(RelBuilder.java:2050) > at org.apache.calcite.tools.RelBuilder.push(RelBuilder.java:243) > at org.apache.calcite.tools.RelBuilder.setOp(RelBuilder.java:1370) > at org.apache.calcite.tools.RelBuilder.union(RelBuilder.java:1390) > at org.apache.calcite.tools.RelBuilder.union(RelBuilder.java:1380) > at > org.apache.calcite.rel.rules.AggregateUnionAggregateRule.onMatch(AggregateUnionAggregateRule.java:130) > > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212) > > ... 8 more > Full example reproducing the error: > [GitHub|https://github.com/lordon/flink_quickstart/blob/master/src/main/java/org/myorg/quickstart/TableJob1.java] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11420) Serialization of case classes containing a Map[String, Any] sometimes throws ArrayIndexOutOfBounds
Jürgen Kreileder created FLINK-11420: Summary: Serialization of case classes containing a Map[String, Any] sometimes throws ArrayIndexOutOfBounds Key: FLINK-11420 URL: https://issues.apache.org/jira/browse/FLINK-11420 Project: Flink Issue Type: Bug Components: Type Serialization System Affects Versions: 1.7.1 Reporter: Jürgen Kreileder We frequently run into random ArrayIndexOutOfBounds exceptions when flink tries to serialize Scala case classes containing a Map[String, Any] (Any being String, Long, Int, or Boolean) with the FsStateBackend. (This probably happens with any case class containing a type requiring Kryo, see this thread for instance: [http://mail-archives.apache.org/mod_mbox/flink-user/201710.mbox/%3cCANNGFpjX4gjV=df6tlfeojsb_rhwxs_ruoylqcqv2gvwqtt...@mail.gmail.com%3e]) Disabling asynchronous snapshots seems to work around the problem, so maybe something is not thread-safe in CaseClassSerializer. Our objects look like this: {code} case class Event(timestamp: Long, [...], content: Map[String, Any] case class EnrichedEvent(event: Event, additionalInfo: Map[String, Any]) {code} I've looked at a few of the exceptions in a debugger. It always happens when serializing the right-hand side a tuple from EnrichedEvent -> Event -> content, e.g: 13 from ("foo", 13) or false from ("bar", false). Stacktrace: {code:java} java.lang.ArrayIndexOutOfBoundsException: Index -1 out of bounds for length 0 at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157) at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822) at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:863) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:217) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) at org.apache.flink.api.scala.typeutils.TraversableSerializer.$anonfun$copy$1(TraversableSerializer.scala:69) at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:234) at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:465) at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:465) at org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:69) at org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) at org.apache.flink.api.common.typeutils.base.ListSerializer.copy(ListSerializer.java:99) at org.apache.flink.api.common.typeutils.base.ListSerializer.copy(ListSerializer.java:42) at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:287) at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:311) at org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:95) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:391) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) at java.base/java.lang.Thread.run(Thread.java:834){code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11420) Serialization of case classes containing a Map[String, Any] sometimes throws ArrayIndexOutOfBoundsException
[ https://issues.apache.org/jira/browse/FLINK-11420?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jürgen Kreileder updated FLINK-11420: - Summary: Serialization of case classes containing a Map[String, Any] sometimes throws ArrayIndexOutOfBoundsException (was: Serialization of case classes containing a Map[String, Any] sometimes throws ArrayIndexOutOfBounds) > Serialization of case classes containing a Map[String, Any] sometimes throws > ArrayIndexOutOfBoundsException > --- > > Key: FLINK-11420 > URL: https://issues.apache.org/jira/browse/FLINK-11420 > Project: Flink > Issue Type: Bug > Components: Type Serialization System >Affects Versions: 1.7.1 >Reporter: Jürgen Kreileder >Priority: Major > > We frequently run into random ArrayIndexOutOfBounds exceptions when flink > tries to serialize Scala case classes containing a Map[String, Any] (Any > being String, Long, Int, or Boolean) with the FsStateBackend. (This probably > happens with any case class containing a type requiring Kryo, see this thread > for instance: > [http://mail-archives.apache.org/mod_mbox/flink-user/201710.mbox/%3cCANNGFpjX4gjV=df6tlfeojsb_rhwxs_ruoylqcqv2gvwqtt...@mail.gmail.com%3e]) > Disabling asynchronous snapshots seems to work around the problem, so maybe > something is not thread-safe in CaseClassSerializer. > Our objects look like this: > {code} > case class Event(timestamp: Long, [...], content: Map[String, Any] > case class EnrichedEvent(event: Event, additionalInfo: Map[String, Any]) > {code} > I've looked at a few of the exceptions in a debugger. It always happens when > serializing the right-hand side a tuple from EnrichedEvent -> Event -> > content, e.g: 13 from ("foo", 13) or false from ("bar", false). > Stacktrace: > {code:java} > java.lang.ArrayIndexOutOfBoundsException: Index -1 out of bounds for length 0 > at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157) > at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822) > at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:863) > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:217) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) > at > org.apache.flink.api.scala.typeutils.TraversableSerializer.$anonfun$copy$1(TraversableSerializer.scala:69) > at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:234) > at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:465) > at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:465) > at > org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:69) > at > org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) > at > org.apache.flink.api.common.typeutils.base.ListSerializer.copy(ListSerializer.java:99) > at > org.apache.flink.api.common.typeutils.base.ListSerializer.copy(ListSerializer.java:42) > at > org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:287) > at > org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:311) > at > org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:95) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:391) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) > at java.base/java.lang.Thread.run(Thread.java:834){code} > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] stevenzwu commented on a change in pull request #7020: [FLINK-10774] [Kafka] connection leak when partition discovery is disabled an…
stevenzwu commented on a change in pull request #7020: [FLINK-10774] [Kafka] connection leak when partition discovery is disabled an… URL: https://github.com/apache/flink/pull/7020#discussion_r250397313 ## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java ## @@ -470,7 +470,20 @@ public void open(Configuration configuration) throws Exception { subscribedPartitionsToStartOffsets = new HashMap<>(); - List allPartitions = partitionDiscoverer.discoverPartitions(); + List allPartitions; + try { + allPartitions = partitionDiscoverer.discoverPartitions(); + } finally { + if (discoveryIntervalMillis == PARTITION_DISCOVERY_DISABLED) { + // when partition discovery is disabled, + // we should close partitionDiscoverer after the initial discovery. + // otherwise we may have connection leak, + // if open method throws an exception after partitionDiscoverer is constructed. + // In this case, run method won't be executed + // and partitionDiscoverer.close() won't be called. + partitionDiscoverer.close(); Review comment: thank you for the suggestion. updated. please ignore white space changes caused by the indention change of try-finally block. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] stevenzwu commented on a change in pull request #7020: [FLINK-10774] [Kafka] connection leak when partition discovery is disabled an…
stevenzwu commented on a change in pull request #7020: [FLINK-10774] [Kafka] connection leak when partition discovery is disabled an… URL: https://github.com/apache/flink/pull/7020#discussion_r250398973 ## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java ## @@ -732,9 +745,7 @@ public void run() { throw new RuntimeException(discoveryLoopError); } } else { - // won't be using the discoverer - partitionDiscoverer.close(); - + // partitionDiscoverer is already closed in open method Review comment: I am not sure if it is necessary to wrap `runFetchLoop` with try-finally. 1) when discovery disabled, we already take care of it in open method 2) when discovery enabled, cancel() takes care of it. what if some other place in run method throws an exception? we always need cancel() to clean up resource if necessary. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] stevenzwu commented on a change in pull request #7020: [FLINK-10774] [Kafka] connection leak when partition discovery is disabled an…
stevenzwu commented on a change in pull request #7020: [FLINK-10774] [Kafka] connection leak when partition discovery is disabled an… URL: https://github.com/apache/flink/pull/7020#discussion_r250397468 ## File path: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java ## @@ -464,6 +466,34 @@ public void go() throws Exception { runThread.sync(); } + @Test + public void testClosePartitionDiscovererWhenOpenThrowException() throws Exception { + final AbstractPartitionDiscoverer mockPartitionDiscoverer = mock(AbstractPartitionDiscoverer.class); + final AtomicBoolean partitionDiscovererClosed = new AtomicBoolean(false); + doAnswer((i) -> { + partitionDiscovererClosed.set(true); + return null; + }).when(mockPartitionDiscoverer).close(); + + final DummyFlinkKafkaConsumer consumer = new DummyFlinkKafkaConsumerThrowExceptionInOpen<>(mockPartitionDiscoverer); + consumer.setCommitOffsetsOnCheckpoints(false); // disabling offset committing should override everything + + try { + setupConsumer( + consumer, + false, + null, + false, // enable checkpointing; auto commit should be ignored + 0, + 1); Review comment: good batch. fixed This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] stevenzwu commented on a change in pull request #7020: [FLINK-10774] [Kafka] connection leak when partition discovery is disabled an…
stevenzwu commented on a change in pull request #7020: [FLINK-10774] [Kafka] connection leak when partition discovery is disabled an… URL: https://github.com/apache/flink/pull/7020#discussion_r250397435 ## File path: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java ## @@ -464,6 +466,34 @@ public void go() throws Exception { runThread.sync(); } + @Test + public void testClosePartitionDiscovererWhenOpenThrowException() throws Exception { + final AbstractPartitionDiscoverer mockPartitionDiscoverer = mock(AbstractPartitionDiscoverer.class); Review comment: great idea. done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] stevenzwu commented on a change in pull request #7020: [FLINK-10774] [Kafka] connection leak when partition discovery is disabled an…
stevenzwu commented on a change in pull request #7020: [FLINK-10774] [Kafka] connection leak when partition discovery is disabled an… URL: https://github.com/apache/flink/pull/7020#discussion_r250397313 ## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java ## @@ -470,7 +470,20 @@ public void open(Configuration configuration) throws Exception { subscribedPartitionsToStartOffsets = new HashMap<>(); - List allPartitions = partitionDiscoverer.discoverPartitions(); + List allPartitions; + try { + allPartitions = partitionDiscoverer.discoverPartitions(); + } finally { + if (discoveryIntervalMillis == PARTITION_DISCOVERY_DISABLED) { + // when partition discovery is disabled, + // we should close partitionDiscoverer after the initial discovery. + // otherwise we may have connection leak, + // if open method throws an exception after partitionDiscoverer is constructed. + // In this case, run method won't be executed + // and partitionDiscoverer.close() won't be called. + partitionDiscoverer.close(); Review comment: thank you for the suggestion. updated. please ignore white space changes caused by the new try-finally block. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-11008) Speed up upload checkpoint files using multi-thread
[ https://issues.apache.org/jira/browse/FLINK-11008?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrey Zagrebin updated FLINK-11008: Fix Version/s: 1.8.0 > Speed up upload checkpoint files using multi-thread > --- > > Key: FLINK-11008 > URL: https://issues.apache.org/jira/browse/FLINK-11008 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Congxian Qiu >Assignee: Congxian Qiu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > Time Spent: 10m > Remaining Estimate: 0h > > As [FLINK-10461|https://issues.apache.org/jira/browse/FLINK-10461] did, we > could speed up upload checkpoint files by using multi-thread. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-11397) Speed up initialization of AbstractStreamOperatorTestHarness
[ https://issues.apache.org/jira/browse/FLINK-11397?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] TisonKun reassigned FLINK-11397: Assignee: TisonKun > Speed up initialization of AbstractStreamOperatorTestHarness > > > Key: FLINK-11397 > URL: https://issues.apache.org/jira/browse/FLINK-11397 > Project: Flink > Issue Type: Improvement > Components: Tests >Affects Versions: 1.8.0 >Reporter: TisonKun >Assignee: TisonKun >Priority: Major > > Currently Kafka connector tests are unbearably slow, which is tracked by > FLINK-10603. With investigation, the construction and initialization of > {{AbstractStreamOperatorTestHarness}} is quite slow. > When walk down the code, it amazed me that {{mockTask = > mock(StreamTask.class);}} cost a few of second to finish. If we can introduce > a test class instead of mock framework, the situation might be loosen. > cc [~Zentol] [~pnowojski] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread
azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread URL: https://github.com/apache/flink/pull/7351#discussion_r250325106 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploader.java ## @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.CheckpointedStateScope; +import org.apache.flink.runtime.state.StateHandleID; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.function.CheckedSupplier; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.function.Supplier; + +/** + * Help class for uploading RocksDB state. + */ +public class RocksDBStateUploader extends RocksDBStateDownloader { Review comment: `extends RocksDBStateDataTransfer` not `RocksDBStateDownloader`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread
azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread URL: https://github.com/apache/flink/pull/7351#discussion_r250317750 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncSnapshotCallable.java ## @@ -97,6 +97,10 @@ protected void cancel() { } } + protected CloseableRegistry getSnapshotCloseableRegistry() { + return snapshotCloseableRegistry; Review comment: could we just make `snapshotCloseableRegistry` protected and use it directly in `RocksIncrementalSnapshotStrategy`? I think we do not need `(un)registerCloseableForCancellation` methods either afterwards and replace calls to them with direct calls to `snapshotCloseableRegistry.(un)registerCloseable`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread
azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread URL: https://github.com/apache/flink/pull/7351#discussion_r250323327 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java ## @@ -701,17 +702,22 @@ public RocksDBNativeMetricOptions getMemoryWatcherOptions() { } /** -* Gets the thread number will used for downloading files from DFS when restore. +* Gets the number of threads used to transfer files while snapshotting/restoring. */ - public int getNumberOfRestoringThreads() { - return numberOfRestoringThreads == UNDEFINED_NUMBER_OF_RESTORING_THREADS ? - RocksDBOptions.CHECKPOINT_RESTORE_THREAD_NUM.defaultValue() : numberOfRestoringThreads; + public int getNumberOfTransferingThreads() { + return numberOfTransferingThreads == UNDEFINED_NUMBER_OF_TRANSFERING_THREADS ? + CHECKPOINT_TRANSFER_THREAD_NUM.defaultValue() : numberOfTransferingThreads; } - public void setNumberOfRestoringThreads(int numberOfRestoringThreads) { - Preconditions.checkArgument(numberOfRestoringThreads > 0, - "The number of threads used to download files from DFS in RocksDBStateBackend should > 0."); - this.numberOfRestoringThreads = numberOfRestoringThreads; + /** +* Sets the number of threads used to transfer files while snapshotting/restoring. +* +* @param numberOfTransferingThreads The number of threads used to download files from DFS while restoring. Review comment: `used to download files from DFS while restoring` -> `to transfer files while snapshotting/restoring` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread
azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread URL: https://github.com/apache/flink/pull/7351#discussion_r250322353 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java ## @@ -47,10 +47,10 @@ HEAP.name(), ROCKSDB.name())); /** -* The number of threads used to download files from DFS in RocksDBStateBackend. +* The number of threads used to transfer(download and upload) files in RocksDBStateBackend. Review comment: space in `transfer(download` before `(`, also below in similar places This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread
azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread URL: https://github.com/apache/flink/pull/7351#discussion_r250324277 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java ## @@ -36,43 +36,47 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -import static org.apache.flink.runtime.concurrent.Executors.newDirectExecutorService; /** - * Data transfer utils for {@link RocksDBKeyedStateBackend}. + * Help class for downloading RocksDBState. */ -class RocksDbStateDataTransfer { +public class RocksDBStateDownloader extends RocksDBStateDataTransfer { + public RocksDBStateDownloader(int restoringThreadNum) { + super(restoringThreadNum); + } - static void transferAllStateDataToDirectory( + /** +* Transfer all state data to the target directory using specified number of threads. +* +* @param restoreStateHandle Handles used to retrieve the state data. +* @param dest The target directory which the state data will be stored. +* @param closeableRegistry Which all the inputStream/outputStream will be registered and unregistered. +* +* @throws Exception Thrown if can not transfer all the state data. +*/ + public void transferAllStateDataToDirectory( IncrementalKeyedStateHandle restoreStateHandle, Path dest, - int restoringThreadNum, CloseableRegistry closeableRegistry) throws Exception { final Map sstFiles = restoreStateHandle.getSharedState(); final Map miscFiles = restoreStateHandle.getPrivateState(); - downloadDataForAllStateHandles(sstFiles, dest, restoringThreadNum, closeableRegistry); - downloadDataForAllStateHandles(miscFiles, dest, restoringThreadNum, closeableRegistry); + downloadDataForAllStateHandles(sstFiles, dest, closeableRegistry); Review comment: could `closeableRegistry` be shared the same way as `executorService` in `RocksDBStateDataTransfer`? Then we do not need to pass to all class methods. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread
azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread URL: https://github.com/apache/flink/pull/7351#discussion_r250319960 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ## @@ -517,6 +516,10 @@ public void restore(Collection restoreState) throws Exception LOG.info("Initializing RocksDB keyed state backend."); + if (LOG.isDebugEnabled()) { Review comment: this log seems to be a duplicate of the following one in try/if/else. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on issue #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer
azagrebin commented on issue #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer URL: https://github.com/apache/flink/pull/7320#issuecomment-456912423 Thanks for the review @tillrohrmann ! I pushed a commit to address the concerns. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] alexeyt820 commented on issue #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers
alexeyt820 commented on issue #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers URL: https://github.com/apache/flink/pull/6615#issuecomment-456910348 Weird build is fine locally using 1.8.0_181-b13, but fails on Travis with 1.8.0_151-b12: >17:38:32.202 [ERROR] /home/travis/build/apache/flink/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONKeyValueDeserializationSchemaTest.java:[105,71] cannot infer type arguments for org.apache.kafka.clients.consumer.ConsumerRecord<> for following lines: ``` final ConsumerRecord consumerRecord = new ConsumerRecord<>( "topic#1", 3, serializedKey, serializedValue, 4L); ``` Any ideas? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-11419) StreamingFileSink fails to recover after taskmanager failure
Edward Rojas created FLINK-11419: Summary: StreamingFileSink fails to recover after taskmanager failure Key: FLINK-11419 URL: https://issues.apache.org/jira/browse/FLINK-11419 Project: Flink Issue Type: Bug Components: filesystem-connector Affects Versions: 1.7.1 Reporter: Edward Rojas If a job with a StreamingFileSink sending data to HDFS is running in a cluster with multiple taskmanagers and the taskmanagers executing the job goes down (for some reason) "missing data in tmp file" because it's not able to perform a truncate in the file. Here the full stack trace: {code:java} java.io.IOException: Missing data in tmp file: hdfs://path/to/hdfs/2019-01-20/.part-0-0.inprogress.823f9c20-3594-4fe3-ae8c-f57b6c35e191 at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.(HadoopRecoverableFsDataOutputStream.java:93) at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.recover(HadoopRecoverableWriter.java:72) at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restoreInProgressFile(Bucket.java:140) at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.(Bucket.java:127) at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:396) at org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:64) at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:177) at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:165) at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:149) at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:334) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): Failed to TRUNCATE_FILE /path/to/hdfs/2019-01-20/.part-0-0.inprogress.823f9c20-3594-4fe3-ae8c-f57b6c35e191 for DFSClient_NONMAPREDUCE_-2103482360_62 on x.xxx.xx.xx because this file lease is currently owned by DFSClient_NONMAPREDUCE_1834204750_59 on x.xx.xx.xx at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:3190) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncateInternal(FSNamesystem.java:2282) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncateInt(FSNamesystem.java:2228) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncate(FSNamesystem.java:2198) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.truncate(NameNodeRpcServer.java:1056) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.truncate(ClientNamenodeProtocolServerSideTranslatorPB.java:622) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2351) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2347) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2347) at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1489) at org.apache.hadoop.ipc.Client.call(Client.java:1435) at org.apache.hadoop.ipc.Client.call(Client.java:1345) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:227) at
[GitHub] tweise merged pull request #7559: [FLINK-11048] Mark new RemoteStreamEnvironment constructor PublicEvolving
tweise merged pull request #7559: [FLINK-11048] Mark new RemoteStreamEnvironment constructor PublicEvolving URL: https://github.com/apache/flink/pull/7559 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-11282) Merge StreamRecordWriter into RecordWriter
[ https://issues.apache.org/jira/browse/FLINK-11282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski closed FLINK-11282. -- Resolution: Fixed merged commit b12d392 into apache:master > Merge StreamRecordWriter into RecordWriter > -- > > Key: FLINK-11282 > URL: https://issues.apache.org/jira/browse/FLINK-11282 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > Labels: pull-request-available > Fix For: 1.8.0 > > Time Spent: 20m > Remaining Estimate: 0h > > {{StreamRecordWriter}} is only used for streaming job which extends > {{RecordWriter}}. The only difference in {{StreamRecordWriter}} is > maintaining the {{OutputFlusher}} thread which can be migrated into > {{RecordWriter}} because the {{flushAlways}} property in {{RecordWriter}} has > relationship with {{OutputFlusher}}. > To do so, we can introduce the special {{BroadcastRecordWriter}} which > extends {{RecordWriter}} for improving broadcast selector for > [FLINK-10662|https://issues.apache.org/jira/browse/FLINK-10662] . And the > {{RecordWriter}} division is unified for both streaming and batch jobs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] pnowojski merged pull request #7438: [FLINK-11282][network] Merge StreamRecordWriter into RecordWriter
pnowojski merged pull request #7438: [FLINK-11282][network] Merge StreamRecordWriter into RecordWriter URL: https://github.com/apache/flink/pull/7438 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Comment Edited] (FLINK-11397) Speed up initialization of AbstractStreamOperatorTestHarness
[ https://issues.apache.org/jira/browse/FLINK-11397?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16750256#comment-16750256 ] Piotr Nowojski edited comment on FLINK-11397 at 1/23/19 5:34 PM: - [~Tison] thanks for reporting the issue. Can you write how did you investigate it? Don't get me wrong, I'm a big opponent of using mockito for things like that and it would be very valuable on it's own to replace this {{mockTask = mock(StreamTask.class)}} with a proper mock or some dummy implementation, but I couldn't fully reproduce your findings. I wrote a simple micro benchmark for both {{mock(StreamTask.class)}} and {{new OneInputStreamOperatorTestHarness()}} constructor and it seemed like first one takes under 0.01ms to completed. Second one is significantly slower, somewhere between 0.1 and 1ms, but I couldn't get the more precise number nor I couldn't investigate/profile the code what's taking so much time because apparently {{new OneInputStreamOperatorTestHarness()}} is leaking some resources and I was getting: {code:java} java.lang.OutOfMemoryError: unable to create new native thread at java.lang.Thread.start0(Native Method) at java.lang.Thread.start(Thread.java:717) at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync.(IOManagerAsync.java:88) at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync.(IOManagerAsync.java:69) at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync.(IOManagerAsync.java:60) at org.apache.flink.runtime.operators.testutils.MockEnvironment.(MockEnvironment.java:144) at org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder.build(MockEnvironmentBuilder.java:126) at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.(AbstractStreamOperatorTestHarness.java:164) at org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness.(OneInputStreamOperatorTestHarness.java:92) at org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness.(OneInputStreamOperatorTestHarness.java:83) at org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness.(OneInputStreamOperatorTestHarness.java:75) {code} Regardless of that, 1ms is still pretty far away from "a few seconds". But as I wrote above - even ignoring the speed up I would more then happy to review/merge the code that gets rid of this mockito mock :) was (Author: pnowojski): [~Tison] thanks for reporting the issue. Can you tell how have you investigated it? Don't get me wrong, I'm a big opponent of using mockito for things like that and it would be very valuable on it's own to replace this {{mockTask = mock(StreamTask.class)}} with a proper mock or some dummy implementation, but I couldn't fully reproduce your findings. I wrote a simple micro benchmark for both {{mock(StreamTask.class)}} and {{new OneInputStreamOperatorTestHarness()}} constructor and it seemed like first one takes under 0.01ms to completed. Second one is significantly slower, somewhere between 0.1 and 1ms, but I couldn't get the more precise number nor I couldn't investigate/profile the code what's taking so much time because apparently {{new OneInputStreamOperatorTestHarness()}} is leaking some resources and I was getting: {code:java} java.lang.OutOfMemoryError: unable to create new native thread at java.lang.Thread.start0(Native Method) at java.lang.Thread.start(Thread.java:717) at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync.(IOManagerAsync.java:88) at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync.(IOManagerAsync.java:69) at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync.(IOManagerAsync.java:60) at org.apache.flink.runtime.operators.testutils.MockEnvironment.(MockEnvironment.java:144) at org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder.build(MockEnvironmentBuilder.java:126) at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.(AbstractStreamOperatorTestHarness.java:164) at org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness.(OneInputStreamOperatorTestHarness.java:92) at org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness.(OneInputStreamOperatorTestHarness.java:83) at org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness.(OneInputStreamOperatorTestHarness.java:75) {code} Regardless of that, 1ms is still pretty far away from "a few seconds". But as I wrote above - even ignoring the speed up I would more then happy to review/merge the code that gets rid of this mockito mock :) > Speed up initialization of AbstractStreamOperatorTestHarness > > > Key: FLINK-11397 > URL:
[jira] [Commented] (FLINK-11397) Speed up initialization of AbstractStreamOperatorTestHarness
[ https://issues.apache.org/jira/browse/FLINK-11397?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16750256#comment-16750256 ] Piotr Nowojski commented on FLINK-11397: [~Tison] thanks for reporting the issue. Can you tell how have you investigated it? Don't get me wrong, I'm a big opponent of using mockito for things like that and it would be very valuable on it's own to replace this {{mockTask = mock(StreamTask.class)}} with a proper mock or some dummy implementation, but I couldn't fully reproduce your findings. I wrote a simple micro benchmark for both {{mock(StreamTask.class)}} and {{new OneInputStreamOperatorTestHarness()}} constructor and it seemed like first one takes under 0.01ms to completed. Second one is significantly slower, somewhere between 0.1 and 1ms, but I couldn't get the more precise number nor I couldn't investigate/profile the code what's taking so much time because apparently {{new OneInputStreamOperatorTestHarness()}} is leaking some resources and I was getting: {code:java} java.lang.OutOfMemoryError: unable to create new native thread at java.lang.Thread.start0(Native Method) at java.lang.Thread.start(Thread.java:717) at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync.(IOManagerAsync.java:88) at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync.(IOManagerAsync.java:69) at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync.(IOManagerAsync.java:60) at org.apache.flink.runtime.operators.testutils.MockEnvironment.(MockEnvironment.java:144) at org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder.build(MockEnvironmentBuilder.java:126) at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.(AbstractStreamOperatorTestHarness.java:164) at org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness.(OneInputStreamOperatorTestHarness.java:92) at org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness.(OneInputStreamOperatorTestHarness.java:83) at org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness.(OneInputStreamOperatorTestHarness.java:75) {code} Regardless of that, 1ms is still pretty far away from "a few seconds". But as I wrote above - even ignoring the speed up I would more then happy to review/merge the code that gets rid of this mockito mock :) > Speed up initialization of AbstractStreamOperatorTestHarness > > > Key: FLINK-11397 > URL: https://issues.apache.org/jira/browse/FLINK-11397 > Project: Flink > Issue Type: Improvement > Components: Tests >Affects Versions: 1.8.0 >Reporter: TisonKun >Priority: Major > > Currently Kafka connector tests are unbearably slow, which is tracked by > FLINK-10603. With investigation, the construction and initialization of > {{AbstractStreamOperatorTestHarness}} is quite slow. > When walk down the code, it amazed me that {{mockTask = > mock(StreamTask.class);}} cost a few of second to finish. If we can introduce > a test class instead of mock framework, the situation might be loosen. > cc [~Zentol] [~pnowojski] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11418) Unable to build docs in Docker image
[ https://issues.apache.org/jira/browse/FLINK-11418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16750185#comment-16750185 ] Chesnay Schepler commented on FLINK-11418: -- Did you try following the docker instructions that were added recently? (see [here|https://flink.apache.org/improve-website.html#update-or-extend-the-documentation]) > Unable to build docs in Docker image > > > Key: FLINK-11418 > URL: https://issues.apache.org/jira/browse/FLINK-11418 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.8.0 >Reporter: Robert Metzger >Priority: Major > > Running > {code:java} > cd flink/docs/docker > ./run.sh{code} > > And then in the container > {code:java} > Welcome to Apache Flink docs > To build, execute > ./build_docs.sh > To watch and regenerate automatically > ./build_docs.sh -p > and access http://localhost:4000 > bash-4.4$ ./build_docs.sh -p > Traceback (most recent call last): > 2: from /usr/local/bin/bundle:23:in `' > 1: from /usr/share/rubygems/rubygems.rb:308:in `activate_bin_path' > /usr/share/rubygems/rubygems.rb:289:in `find_spec_for_exe': can't find gem > bundler (>= 0.a) with executable bundle (Gem::GemNotFoundException){code} > I believe there's something wrong. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9920) BucketingSinkFaultToleranceITCase fails on travis
[ https://issues.apache.org/jira/browse/FLINK-9920?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16750226#comment-16750226 ] Till Rohrmann commented on FLINK-9920: -- Another instance: https://api.travis-ci.org/v3/job/483415515/log.txt > BucketingSinkFaultToleranceITCase fails on travis > - > > Key: FLINK-9920 > URL: https://issues.apache.org/jira/browse/FLINK-9920 > Project: Flink > Issue Type: Bug > Components: filesystem-connector >Affects Versions: 1.6.0, 1.7.0 >Reporter: Chesnay Schepler >Assignee: Aljoscha Krettek >Priority: Critical > Labels: test-stability > Fix For: 1.8.0 > > > https://travis-ci.org/zentol/flink/jobs/407021898 > {code} > Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 13.082 sec > <<< FAILURE! - in > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSinkFaultToleranceITCase > runCheckpointedProgram(org.apache.flink.streaming.connectors.fs.bucketing.BucketingSinkFaultToleranceITCase) > Time elapsed: 5.696 sec <<< FAILURE! > java.lang.AssertionError: Read line does not match expected pattern. > at org.junit.Assert.fail(Assert.java:88) > at > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSinkFaultToleranceITCase.postSubmit(BucketingSinkFaultToleranceITCase.java:182) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11418) Unable to build docs in Docker image
[ https://issues.apache.org/jira/browse/FLINK-11418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16750221#comment-16750221 ] Robert Metzger commented on FLINK-11418: :) I have found a fix for the issue. Will open a PR soon. > Unable to build docs in Docker image > > > Key: FLINK-11418 > URL: https://issues.apache.org/jira/browse/FLINK-11418 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.8.0 >Reporter: Robert Metzger >Priority: Major > > Running > {code:java} > cd flink/docs/docker > ./run.sh{code} > > And then in the container > {code:java} > Welcome to Apache Flink docs > To build, execute > ./build_docs.sh > To watch and regenerate automatically > ./build_docs.sh -p > and access http://localhost:4000 > bash-4.4$ ./build_docs.sh -p > Traceback (most recent call last): > 2: from /usr/local/bin/bundle:23:in `' > 1: from /usr/share/rubygems/rubygems.rb:308:in `activate_bin_path' > /usr/share/rubygems/rubygems.rb:289:in `find_spec_for_exe': can't find gem > bundler (>= 0.a) with executable bundle (Gem::GemNotFoundException){code} > I believe there's something wrong. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] alexeyt820 commented on a change in pull request #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers
alexeyt820 commented on a change in pull request #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers URL: https://github.com/apache/flink/pull/6615#discussion_r250282725 ## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java ## @@ -55,4 +57,13 @@ * @return null or the target topic */ String getTargetTopic(T element); + + /** +* +* @param element The incoming element to be serialized +* @return collection of headers (maybe empty) +*/ + default Iterable> headers(T element) { Review comment: I can try to use ProducerRecord by adding factory method to KeyedSerializationSchema ``` interface KeyedSerializationSchema { ... default ProducerRecord newProducerRecord(T element, byte[] serializedKey, byte[] serializedValue, Integer partition, String topic, Long timestamp) { return new ProducerRecord<>(topic, partition, timestamp, serializedKey, serializedValue); } } ``` And then in a producer: ``` String targetTopic = schema.getTargetTopic(next); if (targetTopic == null) { targetTopic = defaultTopicId; } ... ProducerRecord record = schema.newProducerRecord( next, serializedKey, serializedValue flinkKafkaPartitioner != null ? flinkKafkaPartitioner.partition(next, serializedKey, serializedValue, targetTopic, partitions) : null, targetTopic, timestamp); ... ``` Note that we can't deprecate getTargetTopic, serializeKey and serializedValue, because they are used by partitioner as well. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] alexeyt820 commented on a change in pull request #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers
alexeyt820 commented on a change in pull request #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers URL: https://github.com/apache/flink/pull/6615#discussion_r250282725 ## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java ## @@ -55,4 +57,13 @@ * @return null or the target topic */ String getTargetTopic(T element); + + /** +* +* @param element The incoming element to be serialized +* @return collection of headers (maybe empty) +*/ + default Iterable> headers(T element) { Review comment: I can try to use ProducerRecord by adding factory method to KeyedSerializationSchema ```suggestion interface KeyedSerializationSchema { ... default ProducerRecord newProducerRecord(T element, byte[] serializedKey, byte[] serializedValue, Integer partition, String topic, Long timestamp) { return new ProducerRecord<>(topic, partition, timestamp, serializedKey, serializedValue); } } ``` And then in a producer: ```suggestion String targetTopic = schema.getTargetTopic(next); if (targetTopic == null) { targetTopic = defaultTopicId; } ... ProducerRecord record = schema.newProducerRecord( next, serializedKey, serializedValue flinkKafkaPartitioner != null ? flinkKafkaPartitioner.partition(next, serializedKey, serializedValue, targetTopic, partitions) : null, targetTopic, timestamp); ... ``` Note that we can't deprecate getTargetTopic, serializeKey and serializedValue, because they are used by partitioner as well. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11418) Unable to build docs in Docker image
[ https://issues.apache.org/jira/browse/FLINK-11418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16750200#comment-16750200 ] Chesnay Schepler commented on FLINK-11418: -- oh, this is about the _flink_ docs, not flink-web :/ > Unable to build docs in Docker image > > > Key: FLINK-11418 > URL: https://issues.apache.org/jira/browse/FLINK-11418 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.8.0 >Reporter: Robert Metzger >Priority: Major > > Running > {code:java} > cd flink/docs/docker > ./run.sh{code} > > And then in the container > {code:java} > Welcome to Apache Flink docs > To build, execute > ./build_docs.sh > To watch and regenerate automatically > ./build_docs.sh -p > and access http://localhost:4000 > bash-4.4$ ./build_docs.sh -p > Traceback (most recent call last): > 2: from /usr/local/bin/bundle:23:in `' > 1: from /usr/share/rubygems/rubygems.rb:308:in `activate_bin_path' > /usr/share/rubygems/rubygems.rb:289:in `find_spec_for_exe': can't find gem > bundler (>= 0.a) with executable bundle (Gem::GemNotFoundException){code} > I believe there's something wrong. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11418) Unable to build docs in Docker image
[ https://issues.apache.org/jira/browse/FLINK-11418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16750192#comment-16750192 ] Robert Metzger commented on FLINK-11418: Ah, I didn't know about these. The readme file in the "docs" directory did not mention them. I'll check them out. > Unable to build docs in Docker image > > > Key: FLINK-11418 > URL: https://issues.apache.org/jira/browse/FLINK-11418 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.8.0 >Reporter: Robert Metzger >Priority: Major > > Running > {code:java} > cd flink/docs/docker > ./run.sh{code} > > And then in the container > {code:java} > Welcome to Apache Flink docs > To build, execute > ./build_docs.sh > To watch and regenerate automatically > ./build_docs.sh -p > and access http://localhost:4000 > bash-4.4$ ./build_docs.sh -p > Traceback (most recent call last): > 2: from /usr/local/bin/bundle:23:in `' > 1: from /usr/share/rubygems/rubygems.rb:308:in `activate_bin_path' > /usr/share/rubygems/rubygems.rb:289:in `find_spec_for_exe': can't find gem > bundler (>= 0.a) with executable bundle (Gem::GemNotFoundException){code} > I believe there's something wrong. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] azagrebin commented on a change in pull request #7526: [FLINK-11353][tests] Port JobManagerHAJobGraphRecoveryITCase to new code base
azagrebin commented on a change in pull request #7526: [FLINK-11353][tests] Port JobManagerHAJobGraphRecoveryITCase to new code base URL: https://github.com/apache/flink/pull/7526#discussion_r250231423 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java ## @@ -225,31 +206,82 @@ public static JobGraph createNonEmptyJobGraph() { return jobGraph; } - private static class HATestingDispatcher extends TestingDispatcher { + @Nonnull + private HATestingDispatcher createDispatcher(TestingHighAvailabilityServices highAvailabilityServices, Queue fencingTokens) throws Exception { + return createDispatcher(highAvailabilityServices, fencingTokens, new TestingJobManagerRunnerFactory(new CompletableFuture<>(), new CompletableFuture<>(), CompletableFuture.completedFuture(null))); + } - @Nonnull - private final BlockingQueue fencingTokens; + @Nonnull + private HATestingDispatcher createDispatcher( + TestingHighAvailabilityServices highAvailabilityServices, + @Nullable Queue fencingTokens, + Dispatcher.JobManagerRunnerFactory jobManagerRunnerFactory) throws Exception { + final Configuration configuration = new Configuration(); - HATestingDispatcher(RpcService rpcService, String endpointId, Configuration configuration, HighAvailabilityServices highAvailabilityServices, ResourceManagerGateway resourceManagerGateway, BlobServer blobServer, HeartbeatServices heartbeatServices, JobManagerMetricGroup jobManagerMetricGroup, @Nullable String metricQueryServicePath, ArchivedExecutionGraphStore archivedExecutionGraphStore, JobManagerRunnerFactory jobManagerRunnerFactory, FatalErrorHandler fatalErrorHandler, @Nonnull BlockingQueue fencingTokens) throws Exception { - super(rpcService, endpointId, configuration, highAvailabilityServices, resourceManagerGateway, blobServer, heartbeatServices, jobManagerMetricGroup, metricQueryServicePath, archivedExecutionGraphStore, jobManagerRunnerFactory, fatalErrorHandler); - this.fencingTokens = fencingTokens; - } + return new HATestingDispatcher( + rpcService, + UUID.randomUUID().toString(), + configuration, + highAvailabilityServices, + new TestingResourceManagerGateway(), + new BlobServer(configuration, new VoidBlobStore()), + new HeartbeatServices(1000L, 1000L), + UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(), + null, + new MemoryArchivedExecutionGraphStore(), + jobManagerRunnerFactory, + testingFatalErrorHandler, + fencingTokens); + } - @VisibleForTesting - CompletableFuture getNumberJobs(Time timeout) { - return callAsyncWithoutFencing( - () -> listJobs(timeout).get().size(), - timeout); + private static class HATestingDispatcher extends TestingDispatcher { + + @Nullable + private final Queue fencingTokens; + + HATestingDispatcher( + RpcService rpcService, + String endpointId, + Configuration configuration, + HighAvailabilityServices highAvailabilityServices, + ResourceManagerGateway resourceManagerGateway, + BlobServer blobServer, + HeartbeatServices heartbeatServices, + JobManagerMetricGroup jobManagerMetricGroup, + @Nullable String metricQueryServicePath, + ArchivedExecutionGraphStore archivedExecutionGraphStore, + JobManagerRunnerFactory jobManagerRunnerFactory, + FatalErrorHandler fatalErrorHandler, + @Nullable Queue fencingTokens) throws Exception { + super( + rpcService, + endpointId, + configuration, + highAvailabilityServices, + resourceManagerGateway, + blobServer, + heartbeatServices, + jobManagerMetricGroup, + metricQueryServicePath, + archivedExecutionGraphStore, +
[jira] [Assigned] (FLINK-11362) Check and port TaskManagerComponentsStartupShutdownTest to new code base if necessary
[ https://issues.apache.org/jira/browse/FLINK-11362?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] boshu Zheng reassigned FLINK-11362: --- Assignee: (was: boshu Zheng) > Check and port TaskManagerComponentsStartupShutdownTest to new code base if > necessary > - > > Key: FLINK-11362 > URL: https://issues.apache.org/jira/browse/FLINK-11362 > Project: Flink > Issue Type: Sub-task > Components: Tests >Reporter: Till Rohrmann >Priority: Major > > Check and port {{TaskManagerComponentsStartupShutdownTest}} to new code base > if necessary. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11418) Unable to build docs in Docker image
Robert Metzger created FLINK-11418: -- Summary: Unable to build docs in Docker image Key: FLINK-11418 URL: https://issues.apache.org/jira/browse/FLINK-11418 Project: Flink Issue Type: Bug Components: Documentation Affects Versions: 1.8.0 Reporter: Robert Metzger Running {code:java} cd flink/docs/docker ./run.sh{code} And then in the container {code:java} Welcome to Apache Flink docs To build, execute ./build_docs.sh To watch and regenerate automatically ./build_docs.sh -p and access http://localhost:4000 bash-4.4$ ./build_docs.sh -p Traceback (most recent call last): 2: from /usr/local/bin/bundle:23:in `' 1: from /usr/share/rubygems/rubygems.rb:308:in `activate_bin_path' /usr/share/rubygems/rubygems.rb:289:in `find_spec_for_exe': can't find gem bundler (>= 0.a) with executable bundle (Gem::GemNotFoundException){code} I believe there's something wrong. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] alexeyt820 commented on a change in pull request #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers
alexeyt820 commented on a change in pull request #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers URL: https://github.com/apache/flink/pull/6615#discussion_r250247173 ## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java ## @@ -55,4 +57,13 @@ * @return null or the target topic */ String getTargetTopic(T element); + + /** +* +* @param element The incoming element to be serialized +* @return collection of headers (maybe empty) +*/ + default Iterable> headers(T element) { Review comment: Not sure how calling one constructor can be easier than calling another... Map.Entry is more convenient in case headers representation in element is Map (or Guava's Multimap), in that case it just particular implementation would be element.entrySet() This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on issue #7546: [FLINK-11390][tests] Port YARNSessionCapacitySchedulerITCase#testTaskManagerFailure to new code base
tillrohrmann commented on issue #7546: [FLINK-11390][tests] Port YARNSessionCapacitySchedulerITCase#testTaskManagerFailure to new code base URL: https://github.com/apache/flink/pull/7546#issuecomment-456836215 As @zentol pointed out, it could work to use the metrics to query the number of restarts. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #7546: [FLINK-11390][tests] Port YARNSessionCapacitySchedulerITCase#testTaskManagerFailure to new code base
tillrohrmann commented on a change in pull request #7546: [FLINK-11390][tests] Port YARNSessionCapacitySchedulerITCase#testTaskManagerFailure to new code base URL: https://github.com/apache/flink/pull/7546#discussion_r250224836 ## File path: flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java ## @@ -187,178 +188,146 @@ public void perJobYarnClusterOffHeap() throws IOException { } /** -* Test TaskManager failure and also if the vcores are set correctly (see issue FLINK-2213). +* Starts a session cluster on YARN, and submits a streaming job. +* +* Tests +* +* if a custom YARN application name can be set from the command line, +* if the number of TaskManager slots can be set from the command line, +* if dynamic properties from the command line are set, +* if the vcores are set correctly (FLINK-2213), +* if jobmanager hostname/port are shown in web interface (FLINK-1902) +* +* +* Hint: If you think it is a good idea to add more assertions to this test, think again! */ - @Test(timeout = 10) // timeout after 100 seconds - public void testTaskManagerFailure() throws Exception { - assumeTrue("The new mode does not start TMs upfront.", !isNewMode); - LOG.info("Starting testTaskManagerFailure()"); - Runner runner = startWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(), - "-n", "1", + @Test(timeout = 100_000) + public void testVCoresAreSetCorrectlyAndJobManagerHostnameAreShownInWebInterfaceAndDynamicPropertiesAndYarnApplicationNameAndTaskManagerSlots() throws Exception { + final Runner yarnSessionClusterRunner = startWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(), "-jm", "768m", "-tm", "1024m", "-s", "3", // set the slots 3 to check if the vCores are set properly! "-nm", "customName", "-Dfancy-configuration-value=veryFancy", "-Dyarn.maximum-failed-containers=3", "-D" + YarnConfigOptions.VCORES.key() + "=2"}, - "Number of connected TaskManagers changed to 1. Slots available: 3", + "Flink JobManager is now running on ", RunTypes.YARN_SESSION); - Assert.assertEquals(2, getRunningContainers()); + final String logs = outContent.toString(); + final String hostname = parseJobManagerHostname(logs); + LOG.info("Extracted hostname: {}", hostname); - // Test if JobManager web interface is accessible --- + final ApplicationReport applicationReport = getOnlyApplicationReport(); + final String restApiBaseUrl = normalizeTrackingUrl(applicationReport.getTrackingUrl()); + LOG.info("Got application URL from YARN {}", restApiBaseUrl); - final YarnClient yc = YarnClient.createYarnClient(); - yc.init(YARN_CONFIGURATION); - yc.start(); + submitJob("WindowJoin.jar"); + waitForTaskManagerRegistration(restApiBaseUrl, Duration.ofMillis(30_000)); - List apps = yc.getApplications(EnumSet.of(YarnApplicationState.RUNNING)); - Assert.assertEquals(1, apps.size()); // Only one running - ApplicationReport app = apps.get(0); - Assert.assertEquals("customName", app.getName()); - String url = app.getTrackingUrl(); - if (!url.endsWith("/")) { - url += "/"; - } - if (!url.startsWith("http://;)) { - url = "http://; + url; - } - LOG.info("Got application URL from YARN {}", url); + // + // Assert that custom YARN application name "customName" is set + // + assertEquals("customName", applicationReport.getName()); - String response = TestBaseUtils.getFromHTTP(url + "taskmanagers/"); + // + // Assert the number of TaskManager slots are set + // + assertNumberOfSlotsPerTask(restApiBaseUrl, 3); - JsonNode parsedTMs = new ObjectMapper().readTree(response); - ArrayNode taskManagers = (ArrayNode) parsedTMs.get("taskmanagers"); - Assert.assertNotNull(taskManagers); - Assert.assertEquals(1, taskManagers.size()); - Assert.assertEquals(3,
[GitHub] tillrohrmann commented on a change in pull request #7546: [FLINK-11390][tests] Port YARNSessionCapacitySchedulerITCase#testTaskManagerFailure to new code base
tillrohrmann commented on a change in pull request #7546: [FLINK-11390][tests] Port YARNSessionCapacitySchedulerITCase#testTaskManagerFailure to new code base URL: https://github.com/apache/flink/pull/7546#discussion_r250225736 ## File path: flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java ## @@ -187,178 +188,146 @@ public void perJobYarnClusterOffHeap() throws IOException { } /** -* Test TaskManager failure and also if the vcores are set correctly (see issue FLINK-2213). +* Starts a session cluster on YARN, and submits a streaming job. +* +* Tests +* +* if a custom YARN application name can be set from the command line, +* if the number of TaskManager slots can be set from the command line, +* if dynamic properties from the command line are set, +* if the vcores are set correctly (FLINK-2213), +* if jobmanager hostname/port are shown in web interface (FLINK-1902) +* +* +* Hint: If you think it is a good idea to add more assertions to this test, think again! */ - @Test(timeout = 10) // timeout after 100 seconds - public void testTaskManagerFailure() throws Exception { - assumeTrue("The new mode does not start TMs upfront.", !isNewMode); - LOG.info("Starting testTaskManagerFailure()"); - Runner runner = startWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(), - "-n", "1", + @Test(timeout = 100_000) + public void testVCoresAreSetCorrectlyAndJobManagerHostnameAreShownInWebInterfaceAndDynamicPropertiesAndYarnApplicationNameAndTaskManagerSlots() throws Exception { + final Runner yarnSessionClusterRunner = startWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(), "-jm", "768m", "-tm", "1024m", "-s", "3", // set the slots 3 to check if the vCores are set properly! "-nm", "customName", "-Dfancy-configuration-value=veryFancy", "-Dyarn.maximum-failed-containers=3", "-D" + YarnConfigOptions.VCORES.key() + "=2"}, - "Number of connected TaskManagers changed to 1. Slots available: 3", + "Flink JobManager is now running on ", RunTypes.YARN_SESSION); - Assert.assertEquals(2, getRunningContainers()); + final String logs = outContent.toString(); + final String hostname = parseJobManagerHostname(logs); + LOG.info("Extracted hostname: {}", hostname); - // Test if JobManager web interface is accessible --- + final ApplicationReport applicationReport = getOnlyApplicationReport(); + final String restApiBaseUrl = normalizeTrackingUrl(applicationReport.getTrackingUrl()); + LOG.info("Got application URL from YARN {}", restApiBaseUrl); - final YarnClient yc = YarnClient.createYarnClient(); - yc.init(YARN_CONFIGURATION); - yc.start(); + submitJob("WindowJoin.jar"); + waitForTaskManagerRegistration(restApiBaseUrl, Duration.ofMillis(30_000)); - List apps = yc.getApplications(EnumSet.of(YarnApplicationState.RUNNING)); - Assert.assertEquals(1, apps.size()); // Only one running - ApplicationReport app = apps.get(0); - Assert.assertEquals("customName", app.getName()); - String url = app.getTrackingUrl(); - if (!url.endsWith("/")) { - url += "/"; - } - if (!url.startsWith("http://;)) { - url = "http://; + url; - } - LOG.info("Got application URL from YARN {}", url); + // + // Assert that custom YARN application name "customName" is set + // + assertEquals("customName", applicationReport.getName()); - String response = TestBaseUtils.getFromHTTP(url + "taskmanagers/"); + // + // Assert the number of TaskManager slots are set + // + assertNumberOfSlotsPerTask(restApiBaseUrl, 3); - JsonNode parsedTMs = new ObjectMapper().readTree(response); - ArrayNode taskManagers = (ArrayNode) parsedTMs.get("taskmanagers"); - Assert.assertNotNull(taskManagers); - Assert.assertEquals(1, taskManagers.size()); - Assert.assertEquals(3,
[GitHub] tillrohrmann commented on a change in pull request #7546: [FLINK-11390][tests] Port YARNSessionCapacitySchedulerITCase#testTaskManagerFailure to new code base
tillrohrmann commented on a change in pull request #7546: [FLINK-11390][tests] Port YARNSessionCapacitySchedulerITCase#testTaskManagerFailure to new code base URL: https://github.com/apache/flink/pull/7546#discussion_r250227506 ## File path: flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java ## @@ -124,24 +135,84 @@ public void testKillYarnSessionClusterEntrypoint() throws Exception { final ApplicationId id = restClusterClient.getClusterId(); - waitUntilJobIsRunning(restClusterClient, jobId, RETRY_TIMEOUT); + waitUntilJobIsRunning(restClusterClient, jobId); killApplicationMaster(yarnClusterDescriptor.getYarnSessionClusterEntrypoint()); + waitForApplicationAttempt(id, 2); + waitUntilJobIsRunning(restClusterClient, jobId); + + killApplicationAndWait(id); + } + + @Test + public void testJobRecoversAfterKillingTaskManager() throws Exception { + final RestClusterClient restClusterClient = deploySessionCluster(setupYarnClusterDescriptor()); + final JobID jobId = submitJob(restClusterClient, createJobGraph()); + final ApplicationId id = restClusterClient.getClusterId(); + waitUntilJobIsRunning(restClusterClient, jobId); + + stopTaskManagerContainer(); + waitUntilJobIsRunning(restClusterClient, jobId); Review comment: Can it happen that the JobManager does not realize that a `TM` has died before we call `waitUntilJobIsRunning`? Is there an easy way to check whether we have restarted the job? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11249) FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer
[ https://issues.apache.org/jira/browse/FLINK-11249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16750075#comment-16750075 ] Piotr Nowojski commented on FLINK-11249: Thanks for the response [~elevy]. That would be my guess & hope as well, but unfortunately we need to test for that. > FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer > --- > > Key: FLINK-11249 > URL: https://issues.apache.org/jira/browse/FLINK-11249 > Project: Flink > Issue Type: Sub-task > Components: Kafka Connector >Affects Versions: 1.7.0, 1.7.1 >Reporter: Piotr Nowojski >Assignee: vinoyang >Priority: Blocker > Labels: pull-request-available > Fix For: 1.7.2, 1.8.0 > > Time Spent: 10m > Remaining Estimate: 0h > > As reported by a user on the mailing list "How to migrate Kafka Producer ?" > (on 18th December 2018), {{FlinkKafkaProducer011}} can not be migrated to > {{FlinkKafkaProducer}} and the same problem can occur in the future Kafka > producer versions/refactorings. > The issue is that {{ListState > FlinkKafkaProducer#nextTransactionalIdHintState}} field is serialized using > java serializers and this is causing problems/collisions on > {{FlinkKafkaProducer011.NextTransactionalIdHint}} vs > {{FlinkKafkaProducer.NextTransactionalIdHint}}. > To fix that we probably need to release new versions of those classes, that > will rewrite/upgrade this state field to a new one, that doesn't relay on > java serialization. After this, we could drop the support for the old field > and that in turn will allow users to upgrade from 0.11 connector to the > universal one. > One bright side is that technically speaking our {{FlinkKafkaProducer011}} > has the same compatibility matrix as the universal one (it's also forward & > backward compatible with the same Kafka versions), so for the time being > users can stick to {{FlinkKafkaProducer011}}. > FYI [~tzulitai] [~yanghua] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] azagrebin commented on issue #7188: [FLINK-10473][State TTL] TTL state incremental cleanup for heap backend
azagrebin commented on issue #7188: [FLINK-10473][State TTL] TTL state incremental cleanup for heap backend URL: https://github.com/apache/flink/pull/7188#issuecomment-456828901 Thanks for the review @StefanRRichter ! I activated this feature in the existing e2e test. I also extended the unit test to check that the snapshot is not affected by modifications which happen concurrently after triggering the snapshot. Please, have a look if you have time. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] igalshilman commented on issue #7557: [FLINK-11406] [core] Return INCOMPATIBLE when nested serializers arity don't match in CompositeTypeSerializerSnapshot
igalshilman commented on issue #7557: [FLINK-11406] [core] Return INCOMPATIBLE when nested serializers arity don't match in CompositeTypeSerializerSnapshot URL: https://github.com/apache/flink/pull/7557#issuecomment-456826190 LGTM This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer
tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer URL: https://github.com/apache/flink/pull/7320#discussion_r250218492 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSnapshotTransformFactoryAdaptor.java ## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.StateSnapshotTransformer; +import org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTransformFactory; + +import java.util.Optional; + +abstract class RocksDBSnapshotTransformFactoryAdaptor implements StateSnapshotTransformFactory { + final StateSnapshotTransformFactory snapshotTransformFactory; + + RocksDBSnapshotTransformFactoryAdaptor(StateSnapshotTransformFactory snapshotTransformFactory) { + this.snapshotTransformFactory = snapshotTransformFactory; + } + + @Override + public Optional> createForDeserializedState() { + throw new UnsupportedOperationException("Only serialized state filtering is supported in RocksDB backend"); + } + + @SuppressWarnings("unchecked") + static StateSnapshotTransformFactory wrapStateSnapshotTransformFactory( + StateDescriptor stateDesc, + StateSnapshotTransformFactory snapshotTransformFactory) { + if (stateDesc instanceof ListStateDescriptor) { + TypeSerializer elementSerializer = ((ListStateDescriptor) stateDesc).getElementSerializer(); Review comment: Also if the `TypeSerializer` is stateful, we might run into problems because we are sharing the elementSerializer across all `StateSnapshotTransformers`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer
tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer URL: https://github.com/apache/flink/pull/7320#discussion_r250212255 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSnapshotTransformFactoryAdaptor.java ## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.StateSnapshotTransformer; +import org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTransformFactory; + +import java.util.Optional; + +abstract class RocksDBSnapshotTransformFactoryAdaptor implements StateSnapshotTransformFactory { + final StateSnapshotTransformFactory snapshotTransformFactory; + + RocksDBSnapshotTransformFactoryAdaptor(StateSnapshotTransformFactory snapshotTransformFactory) { + this.snapshotTransformFactory = snapshotTransformFactory; + } + + @Override + public Optional> createForDeserializedState() { + throw new UnsupportedOperationException("Only serialized state filtering is supported in RocksDB backend"); + } + + @SuppressWarnings("unchecked") + static StateSnapshotTransformFactory wrapStateSnapshotTransformFactory( + StateDescriptor stateDesc, + StateSnapshotTransformFactory snapshotTransformFactory) { + if (stateDesc instanceof ListStateDescriptor) { + TypeSerializer elementSerializer = ((ListStateDescriptor) stateDesc).getElementSerializer(); Review comment: I would like to hear @tzulitai opinion on the way we obtain the element serializer here. If I understood it correctly, then we should obtain it from the meta state information because it might be reconfigured. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer
tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer URL: https://github.com/apache/flink/pull/7320#discussion_r250210160 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotTransformerTest.java ## @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTransformFactory; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.runtime.state.internal.InternalMapState; +import org.apache.flink.runtime.state.internal.InternalValueState; +import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory; +import org.apache.flink.util.StringUtils; + +import javax.annotation.Nullable; + +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.Random; +import java.util.concurrent.RunnableFuture; + +import static org.junit.Assert.assertEquals; + +class StateSnapshotTransformerTest { + private final AbstractKeyedStateBackend backend; + private final BlockerCheckpointStreamFactory streamFactory; + private final StateSnapshotTransformFactory snapshotTransformFactory; + + StateSnapshotTransformerTest( + AbstractKeyedStateBackend backend, + BlockerCheckpointStreamFactory streamFactory) { + + this.backend = backend; + this.streamFactory = streamFactory; + this.snapshotTransformFactory = SingleThreadAccessCheckingsnapshotTransformFactory.create(); + } + + void testNonConcurrentSnapshotTransformerAccess() throws Exception { + List testStates = Arrays.asList( + new TestValueState(), + new TestListState(), + new TestMapState() + ); + + for (TestState state : testStates) { + for (int i = 0; i < 100; i++) { + backend.setCurrentKey(i); + state.setToRandomValue(); + } + + CheckpointOptions checkpointOptions = CheckpointOptions.forCheckpointWithDefaultLocation(); + + RunnableFuture> snapshot1 = + backend.snapshot(1L, 0L, streamFactory, checkpointOptions); + + RunnableFuture> snapshot2 = + backend.snapshot(2L, 0L, streamFactory, checkpointOptions); + + Thread runner1 = new Thread(snapshot1, "snapshot1"); + runner1.start(); + Thread runner2 = new Thread(snapshot2, "snapshot2"); + runner2.start(); + + runner1.join(); + runner2.join(); + + snapshot1.get(); + snapshot2.get(); + } + } + + private abstract class TestState { + final Random rnd; + + private TestState() { + this.rnd = new Random(); + } + + abstract void setToRandomValue() throws Exception; + + String getRandomString() { + return StringUtils.getRandomString(rnd, 5, 10); + } + } + + private class TestValueState extends TestState { + private final InternalValueState state; + + private TestValueState() throws Exception { + this.state = backend.createInternalState( + VoidNamespaceSerializer.INSTANCE, + new ValueStateDescriptor<>("TestValueState", StringSerializer.INSTANCE), +
[GitHub] asfgit closed pull request #7569: [hotfix] [javadocs] fix typo in JobManagerRunner
asfgit closed pull request #7569: [hotfix] [javadocs] fix typo in JobManagerRunner URL: https://github.com/apache/flink/pull/7569 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11046) ElasticSearch6Connector cause thread blocked when index failed with retry
[ https://issues.apache.org/jira/browse/FLINK-11046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16750025#comment-16750025 ] xueyu commented on FLINK-11046: --- Hi, [~dawidwys][~tzulitai], I don't know how to make elastic6 response has failure and then trigger onFailure callback in e2e test Elasticsearch6SinkExample.java right now...Do you have any suggestions about this..? Thanks. I tried this and was not sure whether it is right.. {code:java} private static IndexRequest createIndexRequest(String element, ParameterTool parameterTool) { Map json = new HashMap<>(); json.put("data", element); if (element.startsWith("message #15")) { return Requests.indexRequest() .index(null) .type(null) .id(null) .source(element); } else { return Requests.indexRequest() .index(parameterTool.getRequired("index")) .type(parameterTool.getRequired("type")) .id(element) .source(json); } } {code} > ElasticSearch6Connector cause thread blocked when index failed with retry > - > > Key: FLINK-11046 > URL: https://issues.apache.org/jira/browse/FLINK-11046 > Project: Flink > Issue Type: Bug > Components: ElasticSearch Connector >Affects Versions: 1.6.2 >Reporter: luoguohao >Assignee: xueyu >Priority: Major > > When i'm using es6 sink to index into es, bulk process with some exception > catched, and i trying to reindex the document with the call > `indexer.add(action)` in the `ActionRequestFailureHandler.onFailure()` > method, but things goes incorrect. The call thread stuck there, and with the > thread dump, i saw the `bulkprocessor` object was locked by other thread. > {code:java} > public interface ActionRequestFailureHandler extends Serializable { > void onFailure(ActionRequest action, Throwable failure, int restStatusCode, > RequestIndexer indexer) throws Throwable; > } > {code} > After i read the code implemented in the `indexer.add(action)`, i find that > `synchronized` is needed on each add operation. > {code:java} > private synchronized void internalAdd(DocWriteRequest request, @Nullable > Object payload) { > ensureOpen(); > bulkRequest.add(request, payload); > executeIfNeeded(); > } > {code} > And, at i also noticed that `bulkprocessor` object would also locked in the > bulk process thread. > the bulk process operation is in the following code: > {code:java} > public void execute(BulkRequest bulkRequest, long executionId) { > Runnable toRelease = () -> {}; > boolean bulkRequestSetupSuccessful = false; > try { > listener.beforeBulk(executionId, bulkRequest); > semaphore.acquire(); > toRelease = semaphore::release; > CountDownLatch latch = new CountDownLatch(1); > retry.withBackoff(consumer, bulkRequest, new > ActionListener() { > @Override > public void onResponse(BulkResponse response) { > try { > listener.afterBulk(executionId, bulkRequest, response); > } finally { > semaphore.release(); > latch.countDown(); > } > } > @Override > public void onFailure(Exception e) { > try { > listener.afterBulk(executionId, bulkRequest, e); > } finally { > semaphore.release(); > latch.countDown(); > } > } > }, Settings.EMPTY); > bulkRequestSetupSuccessful = true; >if (concurrentRequests == 0) { >latch.await(); > } > } catch (InterruptedException e) { > Thread.currentThread().interrupt(); > logger.info(() -> new ParameterizedMessage("Bulk request {} has been > cancelled.", executionId), e); > listener.afterBulk(executionId, bulkRequest, e); > } catch (Exception e) { > logger.warn(() -> new ParameterizedMessage("Failed to execute bulk > request {}.", executionId), e); > listener.afterBulk(executionId, bulkRequest, e); > } finally { > if (bulkRequestSetupSuccessful == false) { // if we fail on > client.bulk() release the semaphore > toRelease.run(); > } > } > } > {code} > As the read line i marked above, i think, that's the reason why the retry > operation thread was block, because the the bulk
[jira] [Resolved] (FLINK-11348) Port YARNSessionCapacitySchedulerITCase#testClientStartup to new code base
[ https://issues.apache.org/jira/browse/FLINK-11348?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-11348. --- Resolution: Fixed Fix Version/s: 1.8.0 Fixed via https://github.com/apache/flink/commit/3f0b1dfc4da548b166bc3e6aaf4d694c52ddda7b > Port YARNSessionCapacitySchedulerITCase#testClientStartup to new code base > -- > > Key: FLINK-11348 > URL: https://issues.apache.org/jira/browse/FLINK-11348 > Project: Flink > Issue Type: Sub-task >Reporter: vinoyang >Assignee: Gary Yao >Priority: Minor > Labels: pull-request-available > Fix For: 1.8.0 > > Time Spent: 20m > Remaining Estimate: 0h > > Port {{YARNSessionCapacitySchedulerITCase#testClientStartup}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11409) Make `ProcessFunction`, `ProcessWindowFunction` and etc. pure interfaces
[ https://issues.apache.org/jira/browse/FLINK-11409?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kezhu Wang updated FLINK-11409: --- Description: I found these functions express no opinionated demands from implementing classes. It would be nice to implement as interfaces not abstract classes as abstract class is intrusive and hampers caller user cases. For example, client can't write an `AbstractFlinkRichFunction` to unify lifecycle management for all data processing functions in easy way. I dive history of some of these functions, and find that some functions were converted as abstract class from interface due to default method implementation, such as `ProcessFunction` and `CoProcessFunction` were converted to abstract classes in FLINK-4460 which predate -FLINK-72+42+-. After -FLINK-72+42+-, [Java 8 default method|https://docs.oracle.com/javase/tutorial/java/IandI/defaultmethods.html] would be a better solution. I notice also that some functions which are introduced after -FLINK-72+42+-, such as `ProcessJoinFunction`, are implemented as abstract classes. I think it would be better to establish a well-known principle to guide both api authors and callers of data processing functions. Personally, I prefer interface for all exported function callbacks for the reason I express in first paragraph. Besides this, with `AbstractRichFunction` and interfaces for data processing functions I think lots of rich data processing functions can be eliminated as they are plain classes extending `AbstractRichFunction` and implementing data processing interfaces, clients can write this in one line code with clear intention of both data processing and lifecycle management. Following is a possible incomplete list of data processing functions implemented as abstract classes currently: * `ProcessFunction`, `KeyedProcessFunction`, `CoProcessFunction` and `ProcessJoinFunction` * `ProcessWindowFunction` and `ProcessAllWindowFunction` * `BaseBroadcastProcessFunction`, `BroadcastProcessFunction` and `KeyedBroadcastProcessFunction` All above functions are annotated with `@PublicEvolving`, making they interfaces won't break Flink's compatibility guarantee but compatibility is still a big consideration to evaluate this proposal. Any thoughts on this proposal ? Please must comment out. was: I found these functions express no opinionated demands from implementing classes. It would be nice to implement as interfaces not abstract classes as abstract class is intrusive and hampers caller user cases. For example, client can't write an `AbstractFlinkRichFunction` to unify lifecycle management for all data processing functions in easy way. I dive history of some of these functions, and find that some functions were converted as abstract class from interface due to default method implementation, such as `ProcessFunction` and `CoProcessFunction` were converted to abstract classes in FLINK-4460 which predate FLINK-7274. After FLINK-7274, [Java 8 default method|https://docs.oracle.com/javase/tutorial/java/IandI/defaultmethods.html] would be a better solution. I notice also that some functions which are introduced after FLINK-7274, such as `ProcessJoinFunction`, are implemented as abstract classes. I think it would be better to establish a well-known principle to guide both api authors and callers of data processing functions. Personally, I prefer interface for all exported function callbacks for the reason I express in first paragraph. Besides this, with `AbstractRichFunction` and interfaces for data processing functions I think lots of rich data processing functions can be eliminated as they are plain classes extending `AbstractRichFunction` and implementing data processing interfaces, clients can write this in one line code with clear intention of both data processing and lifecycle management. Following is a possible incomplete list of data processing functions implemented as abstract classes currently: * `ProcessFunction`, `KeyedProcessFunction`, `CoProcessFunction` and `ProcessJoinFunction` * `ProcessWindowFunction` and `ProcessAllWindowFunction` * `BaseBroadcastProcessFunction`, `BroadcastProcessFunction` and `KeyedBroadcastProcessFunction` All above functions are annotated with `@PublicEvolving`, making they interfaces won't break Flink's compatibility guarantee but compatibility is still a big consideration to evaluate this proposal. Any thoughts on this proposal ? Please must comment out. > Make `ProcessFunction`, `ProcessWindowFunction` and etc. pure interfaces > > > Key: FLINK-11409 > URL: https://issues.apache.org/jira/browse/FLINK-11409 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Kezhu Wang >Priority: Major
[GitHub] tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer
tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer URL: https://github.com/apache/flink/pull/7320#discussion_r250206990 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotTransformers.java ## @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTransformFactory; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; + +import static org.apache.flink.runtime.state.StateSnapshotTransformer.CollectionStateSnapshotTransformer.TransformStrategy.STOP_ON_FIRST_INCLUDED; + +/** Collection of common state snapshot transformers and their factories. */ +public class StateSnapshotTransformers { Review comment: Alright, makes sense. Thanks for the clarification. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rmetzger commented on issue #7569: [hotfix] [javadocs] fix typo in JobManagerRunner
rmetzger commented on issue #7569: [hotfix] [javadocs] fix typo in JobManagerRunner URL: https://github.com/apache/flink/pull/7569#issuecomment-456811678 Thank you. Will merge. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-11409) Make `ProcessFunction`, `ProcessWindowFunction` and etc. pure interfaces
[ https://issues.apache.org/jira/browse/FLINK-11409?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kezhu Wang updated FLINK-11409: --- Description: I found these functions express no opinionated demands from implementing classes. It would be nice to implement as interfaces not abstract classes as abstract class is intrusive and hampers caller user cases. For example, client can't write an `AbstractFlinkRichFunction` to unify lifecycle management for all data processing functions in easy way. I dive history of some of these functions, and find that some functions were converted as abstract class from interface due to default method implementation, such as `ProcessFunction` and `CoProcessFunction` were converted to abstract classes in FLINK-4460 which predate -FLINK-7242-. After -FLINK-7242-, [Java 8 default method|https://docs.oracle.com/javase/tutorial/java/IandI/defaultmethods.html] would be a better solution. I notice also that some functions which are introduced after -FLINK-7242-, such as `ProcessJoinFunction`, are implemented as abstract classes. I think it would be better to establish a well-known principle to guide both api authors and callers of data processing functions. Personally, I prefer interface for all exported function callbacks for the reason I express in first paragraph. Besides this, with `AbstractRichFunction` and interfaces for data processing functions I think lots of rich data processing functions can be eliminated as they are plain classes extending `AbstractRichFunction` and implementing data processing interfaces, clients can write this in one line code with clear intention of both data processing and lifecycle management. Following is a possible incomplete list of data processing functions implemented as abstract classes currently: * `ProcessFunction`, `KeyedProcessFunction`, `CoProcessFunction` and `ProcessJoinFunction` * `ProcessWindowFunction` and `ProcessAllWindowFunction` * `BaseBroadcastProcessFunction`, `BroadcastProcessFunction` and `KeyedBroadcastProcessFunction` All above functions are annotated with `@PublicEvolving`, making they interfaces won't break Flink's compatibility guarantee but compatibility is still a big consideration to evaluate this proposal. Any thoughts on this proposal ? Please must comment out. was: I found these functions express no opinionated demands from implementing classes. It would be nice to implement as interfaces not abstract classes as abstract class is intrusive and hampers caller user cases. For example, client can't write an `AbstractFlinkRichFunction` to unify lifecycle management for all data processing functions in easy way. I dive history of some of these functions, and find that some functions were converted as abstract class from interface due to default method implementation, such as `ProcessFunction` and `CoProcessFunction` were converted to abstract classes in FLINK-4460 which predate -FLINK-72+42+-. After -FLINK-72+42+-, [Java 8 default method|https://docs.oracle.com/javase/tutorial/java/IandI/defaultmethods.html] would be a better solution. I notice also that some functions which are introduced after -FLINK-72+42+-, such as `ProcessJoinFunction`, are implemented as abstract classes. I think it would be better to establish a well-known principle to guide both api authors and callers of data processing functions. Personally, I prefer interface for all exported function callbacks for the reason I express in first paragraph. Besides this, with `AbstractRichFunction` and interfaces for data processing functions I think lots of rich data processing functions can be eliminated as they are plain classes extending `AbstractRichFunction` and implementing data processing interfaces, clients can write this in one line code with clear intention of both data processing and lifecycle management. Following is a possible incomplete list of data processing functions implemented as abstract classes currently: * `ProcessFunction`, `KeyedProcessFunction`, `CoProcessFunction` and `ProcessJoinFunction` * `ProcessWindowFunction` and `ProcessAllWindowFunction` * `BaseBroadcastProcessFunction`, `BroadcastProcessFunction` and `KeyedBroadcastProcessFunction` All above functions are annotated with `@PublicEvolving`, making they interfaces won't break Flink's compatibility guarantee but compatibility is still a big consideration to evaluate this proposal. Any thoughts on this proposal ? Please must comment out. > Make `ProcessFunction`, `ProcessWindowFunction` and etc. pure interfaces > > > Key: FLINK-11409 > URL: https://issues.apache.org/jira/browse/FLINK-11409 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Kezhu Wang >Priority:
[GitHub] tillrohrmann merged pull request #7538: [FLINK-11348][tests] Port testClientStartup to new codebase
tillrohrmann merged pull request #7538: [FLINK-11348][tests] Port testClientStartup to new codebase URL: https://github.com/apache/flink/pull/7538 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on issue #7526: [FLINK-11353][tests] Port JobManagerHAJobGraphRecoveryITCase to new code base
tillrohrmann commented on issue #7526: [FLINK-11353][tests] Port JobManagerHAJobGraphRecoveryITCase to new code base URL: https://github.com/apache/flink/pull/7526#issuecomment-456809149 Thanks for the review @azagrebin. I've addressed your comments and moved `testPersistedJobGraphWhenDispatcherIsShutDown` to `DispatcherTest` since it is not HA related. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #7526: [FLINK-11353][tests] Port JobManagerHAJobGraphRecoveryITCase to new code base
tillrohrmann commented on a change in pull request #7526: [FLINK-11353][tests] Port JobManagerHAJobGraphRecoveryITCase to new code base URL: https://github.com/apache/flink/pull/7526#discussion_r250202116 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java ## @@ -225,13 +243,78 @@ public static JobGraph createNonEmptyJobGraph() { return jobGraph; } - private static class HATestingDispatcher extends TestingDispatcher { + @Test + public void testPersistedJobGraphWhenDispatcherIsShutDown() throws Exception { + final JobGraph jobGraph = createNonEmptyJobGraph(); + final TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices(); - @Nonnull - private final BlockingQueue fencingTokens; + final InMemorySubmittedJobGraphStore submittedJobGraphStore = new InMemorySubmittedJobGraphStore(); + highAvailabilityServices.setSubmittedJobGraphStore(submittedJobGraphStore); + + final TestingLeaderElectionService leaderElectionService = new TestingLeaderElectionService(); + highAvailabilityServices.setDispatcherLeaderElectionService(leaderElectionService); + + highAvailabilityServices.setJobMasterLeaderElectionService(jobGraph.getJobID(), new TestingLeaderElectionService()); + highAvailabilityServices.setResourceManagerLeaderRetriever(new SettableLeaderRetrievalService()); + highAvailabilityServices.setCheckpointRecoveryFactory(new StandaloneCheckpointRecoveryFactory()); - HATestingDispatcher(RpcService rpcService, String endpointId, Configuration configuration, HighAvailabilityServices highAvailabilityServices, ResourceManagerGateway resourceManagerGateway, BlobServer blobServer, HeartbeatServices heartbeatServices, JobManagerMetricGroup jobManagerMetricGroup, @Nullable String metricQueryServicePath, ArchivedExecutionGraphStore archivedExecutionGraphStore, JobManagerRunnerFactory jobManagerRunnerFactory, FatalErrorHandler fatalErrorHandler, @Nonnull BlockingQueue fencingTokens) throws Exception { - super(rpcService, endpointId, configuration, highAvailabilityServices, resourceManagerGateway, blobServer, heartbeatServices, jobManagerMetricGroup, metricQueryServicePath, archivedExecutionGraphStore, jobManagerRunnerFactory, fatalErrorHandler); + final HATestingDispatcher dispatcher = createDispatcher( Review comment: I think you're right and the test should be moved to `DispatcherTest` because it is not HA specific. Moreover, we should move `getNumberJobs` into `TestingDispatcher`. The annotation `VisibleForTesting` is not necessary since `HATestingDispatcher` is a testing class. Will remove it while moving the method over to `TestingDispatcher`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #7526: [FLINK-11353][tests] Port JobManagerHAJobGraphRecoveryITCase to new code base
tillrohrmann commented on a change in pull request #7526: [FLINK-11353][tests] Port JobManagerHAJobGraphRecoveryITCase to new code base URL: https://github.com/apache/flink/pull/7526#discussion_r250198790 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java ## @@ -225,13 +243,78 @@ public static JobGraph createNonEmptyJobGraph() { return jobGraph; } - private static class HATestingDispatcher extends TestingDispatcher { + @Test + public void testPersistedJobGraphWhenDispatcherIsShutDown() throws Exception { + final JobGraph jobGraph = createNonEmptyJobGraph(); + final TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices(); - @Nonnull - private final BlockingQueue fencingTokens; + final InMemorySubmittedJobGraphStore submittedJobGraphStore = new InMemorySubmittedJobGraphStore(); + highAvailabilityServices.setSubmittedJobGraphStore(submittedJobGraphStore); + + final TestingLeaderElectionService leaderElectionService = new TestingLeaderElectionService(); + highAvailabilityServices.setDispatcherLeaderElectionService(leaderElectionService); + + highAvailabilityServices.setJobMasterLeaderElectionService(jobGraph.getJobID(), new TestingLeaderElectionService()); + highAvailabilityServices.setResourceManagerLeaderRetriever(new SettableLeaderRetrievalService()); + highAvailabilityServices.setCheckpointRecoveryFactory(new StandaloneCheckpointRecoveryFactory()); - HATestingDispatcher(RpcService rpcService, String endpointId, Configuration configuration, HighAvailabilityServices highAvailabilityServices, ResourceManagerGateway resourceManagerGateway, BlobServer blobServer, HeartbeatServices heartbeatServices, JobManagerMetricGroup jobManagerMetricGroup, @Nullable String metricQueryServicePath, ArchivedExecutionGraphStore archivedExecutionGraphStore, JobManagerRunnerFactory jobManagerRunnerFactory, FatalErrorHandler fatalErrorHandler, @Nonnull BlockingQueue fencingTokens) throws Exception { - super(rpcService, endpointId, configuration, highAvailabilityServices, resourceManagerGateway, blobServer, heartbeatServices, jobManagerMetricGroup, metricQueryServicePath, archivedExecutionGraphStore, jobManagerRunnerFactory, fatalErrorHandler); + final HATestingDispatcher dispatcher = createDispatcher( + highAvailabilityServices, + Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE); + + dispatcher.start(); + + try { + // grant leadership and submit a single job + final DispatcherId expectedDispatcherId = DispatcherId.generate(); + leaderElectionService.isLeader(expectedDispatcherId.toUUID()).get(); + + final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class); + final CompletableFuture submissionFuture = dispatcherGateway.submitJob(jobGraph, timeout); + submissionFuture.get(); + assertThat(dispatcher.getNumberJobs(timeout).get(), is(1)); + + dispatcher.shutDown(); + dispatcher.getTerminationFuture().get(); + + assertThat(submittedJobGraphStore.contains(jobGraph.getJobID()), is(true)); Review comment: No it won't. But in the non-HA mode, we usually instantiate a `StandaloneSubmittedJobGraphStore` which is a no-op class. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #7526: [FLINK-11353][tests] Port JobManagerHAJobGraphRecoveryITCase to new code base
tillrohrmann commented on a change in pull request #7526: [FLINK-11353][tests] Port JobManagerHAJobGraphRecoveryITCase to new code base URL: https://github.com/apache/flink/pull/7526#discussion_r250197888 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java ## @@ -150,8 +154,22 @@ public void testGrantingRevokingLeadership() throws Exception { } @Nonnull - private HATestingDispatcher createHADispatcher(TestingHighAvailabilityServices highAvailabilityServices, BlockingQueue fencingTokens) throws Exception { + private HATestingDispatcher createDispatcher(TestingHighAvailabilityServices highAvailabilityServices, Queue fencingTokens) throws Exception { Review comment: Good point. Will do it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on issue #7541: [FLINK-11356][tests] Port JobManagerStartupTest to new code base
tillrohrmann commented on issue #7541: [FLINK-11356][tests] Port JobManagerStartupTest to new code base URL: https://github.com/apache/flink/pull/7541#issuecomment-456803334 Thanks for the review @GJL. I've addressed your comments. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #7541: [FLINK-11356][tests] Port JobManagerStartupTest to new code base
tillrohrmann commented on a change in pull request #7541: [FLINK-11356][tests] Port JobManagerStartupTest to new code base URL: https://github.com/apache/flink/pull/7541#discussion_r250195669 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerTest.java ## @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.blob; + +import org.apache.flink.configuration.BlobServerOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.OperatingSystem; +import org.apache.flink.util.TestLogger; + +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import javax.annotation.Nonnull; + +import java.io.File; +import java.io.IOException; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.junit.Assume.assumeTrue; + +/** + * Tests for the {@link BlobServer}. + */ +public class BlobServerTest extends TestLogger { + + @ClassRule + public static TemporaryFolder temporaryFolder = new TemporaryFolder(); + + /** +* Tests that the {@link BlobServer} fails if the blob storage directory +* cannot be created. +*/ + @Test + public void testFailureIfStorageDirectoryCannotBeCreated() throws IOException { + assumeTrue(!OperatingSystem.isWindows()); //setWritable doesn't work on Windows. Review comment: Good point Gary. I will move the assumption into the `createNonWritableDirectory` method and change it to `assumeFalse`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-11355) Check and port JobManagerProcessReapingTest if necessary to new code base
[ https://issues.apache.org/jira/browse/FLINK-11355?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-11355. - Resolution: Fixed Fix Version/s: 1.8.0 Fixed via https://github.com/apache/flink/commit/1c4df9954e2ba103f09c4160ee918b625c8dc10d > Check and port JobManagerProcessReapingTest if necessary to new code base > - > > Key: FLINK-11355 > URL: https://issues.apache.org/jira/browse/FLINK-11355 > Project: Flink > Issue Type: Sub-task > Components: Tests >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > Time Spent: 20m > Remaining Estimate: 0h > > Check and port {{JobManagerProcessReapingTest}} if necessary to new code base. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tillrohrmann commented on a change in pull request #7541: [FLINK-11356][tests] Port JobManagerStartupTest to new code base
tillrohrmann commented on a change in pull request #7541: [FLINK-11356][tests] Port JobManagerStartupTest to new code base URL: https://github.com/apache/flink/pull/7541#discussion_r250194572 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java ## @@ -360,4 +368,32 @@ public void testConcurrentActorSystemCreation() throws Exception { ExecutorUtils.gracefulShutdown(1L, TimeUnit.MILLISECONDS, executorService); } } + + /** +* Tests that the {@link ActorSystem} fails with an expressive exception if it cannot be +* instantiated due to an occupied port. +*/ + @Test + public void testActorSystemInstantiationFailureWhenPortOccupied() throws Exception { + ServerSocket portOccupier; + final int port; + + try { + port = NetUtils.getAvailablePort(); + portOccupier = new ServerSocket(port, 10, InetAddress.getByName("0.0.0.0")); + } + catch (Throwable t) { + // could not find free port, or open a connection there Review comment: I think you're right that this not necessary by not preselecting the `port`. Instead we should let the `ServerSocket` pick a port via `ServerSocket(0,...)`. Will change it accordingly. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann merged pull request #7540: [FLINK-11355][tests] Remove JobManagerProcessReapingTest
tillrohrmann merged pull request #7540: [FLINK-11355][tests] Remove JobManagerProcessReapingTest URL: https://github.com/apache/flink/pull/7540 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on issue #7540: [FLINK-11355][tests] Remove JobManagerProcessReapingTest
tillrohrmann commented on issue #7540: [FLINK-11355][tests] Remove JobManagerProcessReapingTest URL: https://github.com/apache/flink/pull/7540#issuecomment-456800082 Thanks for your review @azagrebin. Merging this PR. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann closed pull request #7524: [FLINK-11351][tests] Port JobManagerCleanupITCase to new code base
tillrohrmann closed pull request #7524: [FLINK-11351][tests] Port JobManagerCleanupITCase to new code base URL: https://github.com/apache/flink/pull/7524 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-11351) Port JobManagerCleanupITCase to new code base
[ https://issues.apache.org/jira/browse/FLINK-11351?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-11351. - Resolution: Fixed Fix Version/s: 1.8.0 Fixed via 960feb4437c439084cfb317dd127645cfd176578 > Port JobManagerCleanupITCase to new code base > - > > Key: FLINK-11351 > URL: https://issues.apache.org/jira/browse/FLINK-11351 > Project: Flink > Issue Type: Sub-task > Components: Tests >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > Time Spent: 10m > Remaining Estimate: 0h > > Port {{JobManagerCleanupITCase}} to new code base. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tillrohrmann commented on issue #7524: [FLINK-11351][tests] Port JobManagerCleanupITCase to new code base
tillrohrmann commented on issue #7524: [FLINK-11351][tests] Port JobManagerCleanupITCase to new code base URL: https://github.com/apache/flink/pull/7524#issuecomment-456799536 Thanks for the review @GJL. Merging this PR. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services