[jira] [Commented] (FLINK-10460) DataDog reporter JsonMappingException

2019-01-23 Thread lining (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16750837#comment-16750837
 ] 

lining commented on FLINK-10460:


[~phoenixjiangnan] see code 
org.apache.flink.metrics.datadog.DatadogHttpReporter#report, as has exception 
will remove the key, will cause java.util.ConcurrentModificationException.

> DataDog reporter JsonMappingException
> -
>
> Key: FLINK-10460
> URL: https://issues.apache.org/jira/browse/FLINK-10460
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.4.2
>Reporter: Elias Levy
>Priority: Minor
>
> Observed the following error in the TM logs this morning:
> {code:java}
> WARN  org.apache.flink.metrics.datadog.DatadogHttpReporter  - Failed 
> reporting metrics to Datadog.
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException:
>  (was java.util.ConcurrentModificationException) (through reference chain: 
> org.apache.flink.metrics.datadog.DSeries["series"]->
> java.util.ArrayList[88]->org.apache.flink.metrics.datadog.DGauge["points"])
>   at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:379)
>at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:339)
>   at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer.wrapAndThrow(StdSerializer.java:342)
>at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:686)
>at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:157)
>at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.impl.IndexedListSerializer.serializeContents(IndexedListSerializer.java:119)
>at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.impl.IndexedListSerializer.serialize(IndexedListSerializer.java:79)
>   at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.impl.IndexedListSerializer.serialize(IndexedListSerializer.java:18)
>   at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:672)
>   at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:678)
>   at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:157)
>at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:130)
>at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._configAndWriteValue(ObjectMapper.java:3631)
>at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.writeValueAsString(ObjectMapper.java:2998)
>at 
> org.apache.flink.metrics.datadog.DatadogHttpClient.serialize(DatadogHttpClient.java:90)
>at 
> org.apache.flink.metrics.datadog.DatadogHttpClient.send(DatadogHttpClient.java:79)
>at 
> org.apache.flink.metrics.datadog.DatadogHttpReporter.report(DatadogHttpReporter.java:143)
>   at 
> org.apache.flink.runtime.metrics.MetricRegistryImpl$ReporterTask.run(MetricRegistryImpl.java:417)
>at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
>at java.util.concurrent.FutureTask.runAndReset(Unknown Source)
>at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(Unknown
>  Source)
>at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
>  Source)
>at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
>at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
>at java.lang.Thread.run(Unknown Source)
>  Caused by: java.util.ConcurrentModificationException
>at java.util.LinkedHashMap$LinkedHashIterator.nextNode(Unknown Source)
>at java.util.LinkedHashMap$LinkedKeyIterator.next(Unknown Source)
>at java.util.AbstractCollection.addAll(Unknown Source)
>at java.util.HashSet.(Unknown Source)
>at 
> org.apache.kafka.common.internals.PartitionStates.partitionSet(PartitionStates.java:65)
>at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedPartitions(SubscriptionState.java:298)
>at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$ConsumerCoordinatorMetrics$1.measure(ConsumerCoordinator.java:906)
> 

[jira] [Closed] (FLINK-11356) Check and port JobManagerStartupTest to new code base if necessary

2019-01-23 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11356?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-11356.
-
   Resolution: Fixed
Fix Version/s: 1.8.0

Fixed via bb64862b8b7e3ee35625bfe8cb003e8eada8e949

> Check and port JobManagerStartupTest to new code base if necessary
> --
>
> Key: FLINK-11356
> URL: https://issues.apache.org/jira/browse/FLINK-11356
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Check and port {{JobManagerStartupTest}} to new code base if necessary.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tillrohrmann closed pull request #7541: [FLINK-11356][tests] Port JobManagerStartupTest to new code base

2019-01-23 Thread GitBox
tillrohrmann closed pull request #7541: [FLINK-11356][tests] Port 
JobManagerStartupTest to new code base
URL: https://github.com/apache/flink/pull/7541
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7526: [FLINK-11353][tests] Port JobManagerHAJobGraphRecoveryITCase to new code base

2019-01-23 Thread GitBox
tillrohrmann commented on a change in pull request #7526: [FLINK-11353][tests] 
Port JobManagerHAJobGraphRecoveryITCase to new code base
URL: https://github.com/apache/flink/pull/7526#discussion_r250494997
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
 ##
 @@ -225,31 +206,82 @@ public static JobGraph createNonEmptyJobGraph() {
return jobGraph;
}
 
-   private static class HATestingDispatcher extends TestingDispatcher {
+   @Nonnull
+   private HATestingDispatcher 
createDispatcher(TestingHighAvailabilityServices highAvailabilityServices, 
Queue fencingTokens) throws Exception {
+   return createDispatcher(highAvailabilityServices, 
fencingTokens, new TestingJobManagerRunnerFactory(new CompletableFuture<>(), 
new CompletableFuture<>(), CompletableFuture.completedFuture(null)));
+   }
 
-   @Nonnull
-   private final BlockingQueue fencingTokens;
+   @Nonnull
+   private HATestingDispatcher createDispatcher(
+   TestingHighAvailabilityServices highAvailabilityServices,
+   @Nullable Queue fencingTokens,
+   Dispatcher.JobManagerRunnerFactory jobManagerRunnerFactory) 
throws Exception {
+   final Configuration configuration = new Configuration();
 
-   HATestingDispatcher(RpcService rpcService, String endpointId, 
Configuration configuration, HighAvailabilityServices highAvailabilityServices, 
ResourceManagerGateway resourceManagerGateway, BlobServer blobServer, 
HeartbeatServices heartbeatServices, JobManagerMetricGroup 
jobManagerMetricGroup, @Nullable String metricQueryServicePath, 
ArchivedExecutionGraphStore archivedExecutionGraphStore, 
JobManagerRunnerFactory jobManagerRunnerFactory, FatalErrorHandler 
fatalErrorHandler, @Nonnull BlockingQueue fencingTokens) throws 
Exception {
-   super(rpcService, endpointId, configuration, 
highAvailabilityServices, resourceManagerGateway, blobServer, 
heartbeatServices, jobManagerMetricGroup, metricQueryServicePath, 
archivedExecutionGraphStore, jobManagerRunnerFactory, fatalErrorHandler);
-   this.fencingTokens = fencingTokens;
-   }
+   return new HATestingDispatcher(
+   rpcService,
+   UUID.randomUUID().toString(),
+   configuration,
+   highAvailabilityServices,
+   new TestingResourceManagerGateway(),
+   new BlobServer(configuration, new VoidBlobStore()),
+   new HeartbeatServices(1000L, 1000L),
+   
UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
+   null,
+   new MemoryArchivedExecutionGraphStore(),
+   jobManagerRunnerFactory,
+   testingFatalErrorHandler,
+   fencingTokens);
+   }
 
-   @VisibleForTesting
-   CompletableFuture getNumberJobs(Time timeout) {
-   return callAsyncWithoutFencing(
-   () -> listJobs(timeout).get().size(),
-   timeout);
+   private static class HATestingDispatcher extends TestingDispatcher {
+
+   @Nullable
+   private final Queue fencingTokens;
+
+   HATestingDispatcher(
+   RpcService rpcService,
+   String endpointId,
+   Configuration configuration,
+   HighAvailabilityServices 
highAvailabilityServices,
+   ResourceManagerGateway resourceManagerGateway,
+   BlobServer blobServer,
+   HeartbeatServices heartbeatServices,
+   JobManagerMetricGroup jobManagerMetricGroup,
+   @Nullable String metricQueryServicePath,
+   ArchivedExecutionGraphStore 
archivedExecutionGraphStore,
+   JobManagerRunnerFactory jobManagerRunnerFactory,
+   FatalErrorHandler fatalErrorHandler,
+   @Nullable Queue fencingTokens) 
throws Exception {
+   super(
+   rpcService,
+   endpointId,
+   configuration,
+   highAvailabilityServices,
+   resourceManagerGateway,
+   blobServer,
+   heartbeatServices,
+   jobManagerMetricGroup,
+   metricQueryServicePath,
+   archivedExecutionGraphStore,
+  

[jira] [Commented] (FLINK-10477) Add rescale button to web interface

2019-01-23 Thread lining (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16750818#comment-16750818
 ] 

lining commented on FLINK-10477:


Hi, [~yanghua]. How  it is going?

> Add rescale button to web interface
> ---
>
> Key: FLINK-10477
> URL: https://issues.apache.org/jira/browse/FLINK-10477
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend
>Reporter: Sander Ploegsma
>Assignee: vinoyang
>Priority: Minor
>
> Instead of having to use the REST API to rescale a running job, it would be 
> much easier if we were able to rescale a job from the web interface.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] libenchao opened a new pull request #7572: fix typo in JobMaster

2019-01-23 Thread GitBox
libenchao opened a new pull request #7572: fix typo in JobMaster
URL: https://github.com/apache/flink/pull/7572
 
 
   Fix typo in JobMaster


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10707) Improve Cluster Overview in Flink Dashboard

2019-01-23 Thread lining (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10707?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16750811#comment-16750811
 ] 

lining commented on FLINK-10707:


Hi, [~wolli]. How is it going?

> Improve Cluster Overview in Flink Dashboard
> ---
>
> Key: FLINK-10707
> URL: https://issues.apache.org/jira/browse/FLINK-10707
> Project: Flink
>  Issue Type: Sub-task
>  Components: Webfrontend
>Reporter: Fabian Wollert
>Assignee: Fabian Wollert
>Priority: Major
> Fix For: 1.8.0
>
> Attachments: flink-dashboard.png
>
>
> The flink Dashboard is currently very simple. The following screenshot is a 
> mock of an improvement proposal:
> !flink-dashboard.png|width=806,height=401!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] pnowojski commented on a change in pull request #7570: [FLINK-11422] Prefer testing class to mock StreamTask in AbstractStre…

2019-01-23 Thread GitBox
pnowojski commented on a change in pull request #7570: [FLINK-11422] Prefer 
testing class to mock StreamTask in AbstractStre…
URL: https://github.com/apache/flink/pull/7570#discussion_r250487400
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
 ##
 @@ -636,4 +597,81 @@ public void close() {
// ignore
}
}
+
+   private class MockStreamTask extends StreamTask {
 
 Review comment:
   Can you try to convert this class to public non inner class? This class can 
definitely evolve into something that's re-used across multiple tests, since 
this is not the only place that's using `mock(StreamTask.class)`. 
   
   I think it's easy to do with "refactor" options in Intellij:
   1. refactor -> convert to static class (to generate a constructor for 
passing all of the fields)
   2. refactor -> move (to move from inner class to outer level)
   
   I guess the only "problematic" thing to solve is 
`streamTaskStateInitializer` field, which is non final.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-11423) Propagate the error message from Main method to JarRunHandler

2019-01-23 Thread Lavkesh Lahngir (JIRA)
Lavkesh Lahngir created FLINK-11423:
---

 Summary: Propagate the error message from Main method to 
JarRunHandler
 Key: FLINK-11423
 URL: https://issues.apache.org/jira/browse/FLINK-11423
 Project: Flink
  Issue Type: Improvement
  Components: Job-Submission
Reporter: Lavkesh Lahngir
Assignee: Lavkesh Lahngir


The jar/run API calls JarRunHandler
The client only receives a simple message like "The main method caused an error"
without any more detail.
When we throw ProgramInvocationException in PackagedProgram.callMainMethod()
we should add exceptionInMethod.getMessage() too.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10724) Refactor failure handling in check point coordinator

2019-01-23 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10724?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10724:
---
Labels: pull-request-available  (was: )

> Refactor failure handling in check point coordinator
> 
>
> Key: FLINK-10724
> URL: https://issues.apache.org/jira/browse/FLINK-10724
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Andrey Zagrebin
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> At the moment failure handling of asynchronously triggered checkpoint in 
> check point coordinator happens in different places. We could organise it 
> similar way as failure handling of synchronous triggering of checkpoint in 
> *CheckpointTriggerResult* where we classify error cases. This will simplify 
> e.g. integration of error counter for FLINK-4810.
> See also discussion here: [https://github.com/apache/flink/pull/6567]
> The specific design document : 
> https://docs.google.com/document/d/1ce7RtecuTxcVUJlnU44hzcO2Dwq9g4Oyd8_biy94hJc/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua opened a new pull request #7571: [FLINK-10724] Refactor failure handling in check point coordinator

2019-01-23 Thread GitBox
yanghua opened a new pull request #7571: [FLINK-10724] Refactor failure 
handling in check point coordinator
URL: https://github.com/apache/flink/pull/7571
 
 
   ## What is the purpose of the change
   
   *This pull request refactored failure handling in checkpoint coordinator and 
reimplemented the `failOnCheckpointingErrors` in checkpoint coordinator*
   
   
   ## Brief change log
   
 - *Refactor PendingCheckpoint#abortXXX*
 - *reimplemented the `failOnCheckpointingErrors` in checkpoint coordinator*
   
   
   ## Verifying this change
   
   This change is already covered by existing tests, such as 
*CheckpointCoordinatorTest#testTriggerAndDeclineWithExecutionFailure* and 
*CheckpointCoordinatorTest#testTriggerAndDeclineSimpleSavepointWithExecutionFailure*.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ **not documented**)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhijiangW commented on a change in pull request #7199: [FLINK-10662][network] Refactor the ChannelSelector interface for single selected channel

2019-01-23 Thread GitBox
zhijiangW commented on a change in pull request #7199: [FLINK-10662][network] 
Refactor the ChannelSelector interface for single selected channel
URL: https://github.com/apache/flink/pull/7199#discussion_r250484212
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ##
 @@ -68,20 +68,29 @@
 
private final boolean flushAlways;
 
+   private final boolean isBroadcastSelector;
+
private Counter numBytesOut = new SimpleCounter();
 
private Counter numBuffersOut = new SimpleCounter();
 
public RecordWriter(ResultPartitionWriter writer) {
-   this(writer, new RoundRobinChannelSelector());
+   this(writer, new RoundRobinChannelSelector(), false);
}
 
-   @SuppressWarnings("unchecked")
-   public RecordWriter(ResultPartitionWriter writer, ChannelSelector 
channelSelector) {
-   this(writer, channelSelector, false);
+   public RecordWriter(
 
 Review comment:
   I am considering the way for solving this issue.
   
   I agree it seems better to introduce either a builder method or 
`RecordWriterBuilder` class for creating the specific RecordWriter. But it 
seems unfeasible to check ChannelSelector instance in current architecture, 
because the classes of `BatchTask` and `StreamTask` are in different modules 
and the` instanceof BroadcastPartitioner` check would only work under streaming 
module, not meanwhile work for `BatchTask` in runtime module.
   
   There might be two ways for this issue:
   1. Introduce `ChannelSelector#isBroadcast` method to solve the module 
dependence.
   2. Check the broadcast mode separately in `StreamTask` and `BatchTask` 
classes, that means no unified builder for creating `RecordWriter`
   
   I prefer the second way a bit in order not to change the interface much, 
although this check instance would be scattered in two different places. What 
do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] klion26 commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread

2019-01-23 Thread GitBox
klion26 commented on a change in pull request #7351: [FLINK-11008][State 
Backends, Checkpointing]SpeedUp upload state files using multithread
URL: https://github.com/apache/flink/pull/7351#discussion_r250477821
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java
 ##
 @@ -36,43 +36,47 @@
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import static 
org.apache.flink.runtime.concurrent.Executors.newDirectExecutorService;
 
 /**
- * Data transfer utils for {@link RocksDBKeyedStateBackend}.
+ * Help class for downloading RocksDBState.
  */
-class RocksDbStateDataTransfer {
+public class RocksDBStateDownloader extends RocksDBStateDataTransfer {
+   public RocksDBStateDownloader(int restoringThreadNum) {
+   super(restoringThreadNum);
+   }
 
-   static void transferAllStateDataToDirectory(
+   /**
+* Transfer all state data to the target directory using specified 
number of threads.
+*
+* @param restoreStateHandle Handles used to retrieve the state data.
+* @param dest The target directory which the state data will be stored.
+* @param closeableRegistry Which all the inputStream/outputStream will 
be registered and unregistered.
+*
+* @throws Exception Thrown if can not transfer all the state data.
+*/
+   public void transferAllStateDataToDirectory(
IncrementalKeyedStateHandle restoreStateHandle,
Path dest,
-   int restoringThreadNum,
CloseableRegistry closeableRegistry) throws Exception {
 
final Map sstFiles =
restoreStateHandle.getSharedState();
final Map miscFiles =
restoreStateHandle.getPrivateState();
 
-   downloadDataForAllStateHandles(sstFiles, dest, 
restoringThreadNum, closeableRegistry);
-   downloadDataForAllStateHandles(miscFiles, dest, 
restoringThreadNum, closeableRegistry);
+   downloadDataForAllStateHandles(sstFiles, dest, 
closeableRegistry);
 
 Review comment:
   @azagrebin We can not share `closeableRegistry` because of supportting 
parallel snapshot. If we share the `closeableRegistry` when parallel snapshot, 
the later complete snapshot will come into an Exception `IOException("Cannot 
register Closeable, registry is already closed. Closing argument.")`(The 
`closeableRegistry` was close in `AsyncSnapshotCallable#closeSnapshotIO`) when 
registering the input/outputstream to the registry, such as the [Travis log 
said](https://travis-ci.org/apache/flink/jobs/483728783).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] klion26 edited a comment on issue #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread

2019-01-23 Thread GitBox
klion26 edited a comment on issue #7351: [FLINK-11008][State Backends, 
Checkpointing]SpeedUp upload state files using multithread
URL: https://github.com/apache/flink/pull/7351#issuecomment-457074711
 
 
   Seems like there is a relevant test failed after reusing the registry, I'm 
looking into it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] klion26 commented on issue #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread

2019-01-23 Thread GitBox
klion26 commented on issue #7351: [FLINK-11008][State Backends, 
Checkpointing]SpeedUp upload state files using multithread
URL: https://github.com/apache/flink/pull/7351#issuecomment-457074711
 
 
   Seems like there is a relevant test failed, I'm looking into it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] alexeyt820 commented on a change in pull request #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers

2019-01-23 Thread GitBox
alexeyt820 commented on a change in pull request #6615: [FLINK-8354] 
[flink-connectors] Add ability to access and provider Kafka headers
URL: https://github.com/apache/flink/pull/6615#discussion_r250467958
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java
 ##
 @@ -55,4 +57,13 @@
 * @return null or the target topic
 */
String getTargetTopic(T element);
+
+   /**
+*
+* @param element The incoming element to be serialized
+* @return collection of headers (maybe empty)
+*/
+   default Iterable> headers(T element) {
 
 Review comment:
   @azagrebin, so you suggest to have Flink class to represent Kafka Header 
instead of Map.Entry?  


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] alexeyt820 commented on a change in pull request #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers

2019-01-23 Thread GitBox
alexeyt820 commented on a change in pull request #6615: [FLINK-8354] 
[flink-connectors] Add ability to access and provider Kafka headers
URL: https://github.com/apache/flink/pull/6615#discussion_r250457396
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java
 ##
 @@ -55,4 +57,13 @@
 * @return null or the target topic
 */
String getTargetTopic(T element);
+
+   /**
+*
+* @param element The incoming element to be serialized
+* @return collection of headers (maybe empty)
+*/
+   default Iterable> headers(T element) {
 
 Review comment:
   Can't have default method creating `ProducerRecord`, because 0.8 doesn't 
have timestamp parameter in ProducerRecord constructor


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-11397) Speed up initialization of AbstractStreamOperatorTestHarness

2019-01-23 Thread TisonKun (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11397?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

TisonKun reassigned FLINK-11397:


Assignee: (was: TisonKun)

> Speed up initialization of AbstractStreamOperatorTestHarness
> 
>
> Key: FLINK-11397
> URL: https://issues.apache.org/jira/browse/FLINK-11397
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.8.0
>Reporter: TisonKun
>Priority: Major
>
> Currently Kafka connector tests are unbearably slow, which is tracked by 
> FLINK-10603. With investigation, the construction and initialization of 
> {{AbstractStreamOperatorTestHarness}} is quite slow.
> -When walk down the code, it amazed me that {{mockTask = 
> mock(StreamTask.class);}} cost a few of second to finish. If we can introduce 
> a test class instead of mock framework, the situation might be loosen.-
> cc [~Zentol] [~pnowojski]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11422) Prefer testing class to mock StreamTask in AbstractStreamOperatorTestHarness

2019-01-23 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11422?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-11422:
---
Labels: pull-request-available  (was: )

> Prefer testing class to mock StreamTask in AbstractStreamOperatorTestHarness
> 
>
> Key: FLINK-11422
> URL: https://issues.apache.org/jira/browse/FLINK-11422
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.8.0
>Reporter: TisonKun
>Assignee: TisonKun
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] TisonKun opened a new pull request #7570: [FLINK-11422] Prefer testing class to mock StreamTask in AbstractStre…

2019-01-23 Thread GitBox
TisonKun opened a new pull request #7570: [FLINK-11422] Prefer testing class to 
mock StreamTask in AbstractStre…
URL: https://github.com/apache/flink/pull/7570
 
 
   …amOperatorTestHarness
   
   ## What is the purpose of the change
   
   As title, we prefer using a testing class to a mockito mock.
   
   ## Brief change log
   
   Introduce an inner class `MockStreamTask` and replace mock StreamTask with 
it.
   
   ## Verifying this change
   
   This change is already covered by existing tests that using 
StreamOperatorTestHarness.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive):(no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   
   cc @pnowojski 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] klion26 commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread

2019-01-23 Thread GitBox
klion26 commented on a change in pull request #7351: [FLINK-11008][State 
Backends, Checkpointing]SpeedUp upload state files using multithread
URL: https://github.com/apache/flink/pull/7351#discussion_r250456369
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploader.java
 ##
 @@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.CheckpointedStateScope;
+import org.apache.flink.runtime.state.StateHandleID;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.function.CheckedSupplier;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Supplier;
+
+/**
+ * Help class for uploading RocksDB state.
+ */
+public class RocksDBStateUploader extends RocksDBStateDownloader {
 
 Review comment:
   My bad, I chose the wrong one when using auto-complete.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-11422) Prefer testing class to mock StreamTask in AbstractStreamOperatorTestHarness

2019-01-23 Thread TisonKun (JIRA)
TisonKun created FLINK-11422:


 Summary: Prefer testing class to mock StreamTask in 
AbstractStreamOperatorTestHarness
 Key: FLINK-11422
 URL: https://issues.apache.org/jira/browse/FLINK-11422
 Project: Flink
  Issue Type: Improvement
  Components: Tests
Affects Versions: 1.8.0
Reporter: TisonKun
Assignee: TisonKun
 Fix For: 1.8.0






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11397) Speed up initialization of AbstractStreamOperatorTestHarness

2019-01-23 Thread TisonKun (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11397?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16750680#comment-16750680
 ] 

TisonKun commented on FLINK-11397:
--

[~pnowojski] I find such "cause" when running a debugger step by step. However, 
when I replacing the mock class with a testing class, the total test time isn't 
reduced. I'm sorry for mistaking the cause.

As getting rid of mockito mock, I would open another Jira to tracking it.

> Speed up initialization of AbstractStreamOperatorTestHarness
> 
>
> Key: FLINK-11397
> URL: https://issues.apache.org/jira/browse/FLINK-11397
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.8.0
>Reporter: TisonKun
>Priority: Major
>
> Currently Kafka connector tests are unbearably slow, which is tracked by 
> FLINK-10603. With investigation, the construction and initialization of 
> {{AbstractStreamOperatorTestHarness}} is quite slow.
> -When walk down the code, it amazed me that {{mockTask = 
> mock(StreamTask.class);}} cost a few of second to finish. If we can introduce 
> a test class instead of mock framework, the situation might be loosen.-
> cc [~Zentol] [~pnowojski]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11397) Speed up initialization of AbstractStreamOperatorTestHarness

2019-01-23 Thread TisonKun (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11397?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

TisonKun updated FLINK-11397:
-
Description: 
Currently Kafka connector tests are unbearably slow, which is tracked by 
FLINK-10603. With investigation, the construction and initialization of 
{{AbstractStreamOperatorTestHarness}} is quite slow.

-When walk down the code, it amazed me that {{mockTask = 
mock(StreamTask.class);}} cost a few of second to finish. If we can introduce a 
test class instead of mock framework, the situation might be loosen.-

cc [~Zentol] [~pnowojski]

  was:
Currently Kafka connector tests are unbearably slow, which is tracked by 
FLINK-10603. With investigation, the construction and initialization of 
{{AbstractStreamOperatorTestHarness}} is quite slow.

When walk down the code, it amazed me that {{mockTask = 
mock(StreamTask.class);}} cost a few of second to finish. If we can introduce a 
test class instead of mock framework, the situation might be loosen.

cc [~Zentol] [~pnowojski]


> Speed up initialization of AbstractStreamOperatorTestHarness
> 
>
> Key: FLINK-11397
> URL: https://issues.apache.org/jira/browse/FLINK-11397
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.8.0
>Reporter: TisonKun
>Assignee: TisonKun
>Priority: Major
>
> Currently Kafka connector tests are unbearably slow, which is tracked by 
> FLINK-10603. With investigation, the construction and initialization of 
> {{AbstractStreamOperatorTestHarness}} is quite slow.
> -When walk down the code, it amazed me that {{mockTask = 
> mock(StreamTask.class);}} cost a few of second to finish. If we can introduce 
> a test class instead of mock framework, the situation might be loosen.-
> cc [~Zentol] [~pnowojski]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7608) LatencyGauge change to histogram metric

2019-01-23 Thread Shuyi Chen (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16750675#comment-16750675
 ] 

Shuyi Chen commented on FLINK-7608:
---

[~Zentol],  [~aljoscha], it appears that after this PR, I can no longer find 
the latency histogram from the web UI or the rest API. Is it because now it's 
grouped by operator id, and not vertex id? Is there a way that I can find this 
metric through the web UI or rest API? Thanks a lot.

> LatencyGauge change to  histogram metric
> 
>
> Key: FLINK-7608
> URL: https://issues.apache.org/jira/browse/FLINK-7608
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Reporter: Hai Zhou
>Assignee: Hai Zhou
>Priority: Major
> Fix For: 1.5.0
>
>
> I used slf4jReporter[https://issues.apache.org/jira/browse/FLINK-4831]  to 
> export metrics the log file.
> I found:
> {noformat}
> -- Gauges 
> -
> ..
> zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming 
> Job.Map.0.latency:
>  value={LatencySourceDescriptor{vertexID=1, subtaskIndex=-1}={p99=116.0, 
> p50=59.5, min=11.0, max=116.0, p95=116.0, mean=61.836}}
> zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming 
> Job.Sink- Unnamed.0.latency: 
> value={LatencySourceDescriptor{vertexID=1, subtaskIndex=0}={p99=195.0, 
> p50=163.5, min=115.0, max=195.0, p95=195.0, mean=161.0}}
> ..
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] alexeyt820 commented on a change in pull request #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers

2019-01-23 Thread GitBox
alexeyt820 commented on a change in pull request #6615: [FLINK-8354] 
[flink-connectors] Add ability to access and provider Kafka headers
URL: https://github.com/apache/flink/pull/6615#discussion_r250453890
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
 ##
 @@ -32,6 +34,49 @@
  */
 @PublicEvolving
 public interface KeyedDeserializationSchema extends Serializable, 
ResultTypeQueryable {
+   /**
+* Kafka record to be deserialized.
+* Record consists of key,value pair, topic name, partition offset, 
headers and a timestamp (if available)
+*/
+   interface Record {
 
 Review comment:
   Done in 2adbacb


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] alexeyt820 commented on a change in pull request #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers

2019-01-23 Thread GitBox
alexeyt820 commented on a change in pull request #6615: [FLINK-8354] 
[flink-connectors] Add ability to access and provider Kafka headers
URL: https://github.com/apache/flink/pull/6615#discussion_r250453715
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka011ConsumerRecord.java
 ##
 @@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.shaded.guava18.com.google.common.base.Function;
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.header.Header;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.AbstractMap;
+import java.util.Map;
+
+/**
+ * Extends base Kafka09ConsumerRecord to provide access to Kafka headers.
+ */
+class Kafka011ConsumerRecord extends Kafka09ConsumerRecord {
+   /**
+* Wraps {@link Header} as Map.Entry.
+*/
+   private static final Function> 
HEADER_TO_MAP_ENTRY_FUNCTION =
+   new Function>() {
+   @Nonnull
+   @Override
+   public Map.Entry apply(@Nullable Header 
header) {
+   return new 
AbstractMap.SimpleImmutableEntry<>(header.key(), header.value());
+   }
+   };
+
+   Kafka011ConsumerRecord(ConsumerRecord consumerRecord) {
+   super(consumerRecord);
+   }
+
+   @Override
+   public Iterable> headers() {
+   return Iterables.transform(consumerRecord.headers(), 
HEADER_TO_MAP_ENTRY_FUNCTION);
 
 Review comment:
   Resolved (file removed) in 2adbacb


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] alexeyt820 commented on a change in pull request #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers

2019-01-23 Thread GitBox
alexeyt820 commented on a change in pull request #6615: [FLINK-8354] 
[flink-connectors] Add ability to access and provider Kafka headers
URL: https://github.com/apache/flink/pull/6615#discussion_r250453628
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
 ##
 @@ -370,8 +370,8 @@ else if (partitionsRemoved) {

keyPayload.get(keyBytes);
}
 
-   final T value = 
deserializer.deserialize(keyBytes, valueBytes,
-   
currentPartition.getTopic(), currentPartition.getPartition(), offset);
+   final T value = 
deserializer.deserialize(
 
 Review comment:
   Done in 2adbacb


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] alexeyt820 commented on issue #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers

2019-01-23 Thread GitBox
alexeyt820 commented on issue #6615: [FLINK-8354] [flink-connectors] Add 
ability to access and provider Kafka headers
URL: https://github.com/apache/flink/pull/6615#issuecomment-457051271
 
 
   The compile problem was due master flink-connector-kafka-base uses Kafka 
0.10.2.1, where ConsumerRecord constructor has different signature from Kafka 
0.8, which was used flink-connector-kafka-base when I branched PR branch. To be 
signature independent, changed test to use mock instead for creating 
ConsumerRecord via constructor.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-11301) Travis failed: No output has been received in the last 10m0s

2019-01-23 Thread Hequn Cheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11301?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hequn Cheng updated FLINK-11301:

Attachment: error.log

> Travis failed: No output has been received in the last 10m0s
> 
>
> Key: FLINK-11301
> URL: https://issues.apache.org/jira/browse/FLINK-11301
> Project: Flink
>  Issue Type: Bug
>  Components: Travis
>Reporter: Hequn Cheng
>Priority: Major
> Attachments: error.log
>
>
> -[https://api.travis-ci.org/v3/job/47082/log.txt]-
> new log link: 
> [https://api.travis-ci.org/v3/job/483359744/log.txt|https://api.travis-ci.org/v3/job/483359744/log.txt]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11301) Travis failed: No output has been received in the last 10m0s

2019-01-23 Thread Hequn Cheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16750659#comment-16750659
 ] 

Hequn Cheng commented on FLINK-11301:
-

I encountered the same error in my last PR.  Log link has been updated in the 
Description.

> Travis failed: No output has been received in the last 10m0s
> 
>
> Key: FLINK-11301
> URL: https://issues.apache.org/jira/browse/FLINK-11301
> Project: Flink
>  Issue Type: Bug
>  Components: Travis
>Reporter: Hequn Cheng
>Priority: Major
>
> -[https://api.travis-ci.org/v3/job/47082/log.txt]-
> new log link: 
> [https://api.travis-ci.org/v3/job/483359744/log.txt|https://api.travis-ci.org/v3/job/483359744/log.txt]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11301) Travis failed: No output has been received in the last 10m0s

2019-01-23 Thread Hequn Cheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11301?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hequn Cheng updated FLINK-11301:

Description: 
-[https://api.travis-ci.org/v3/job/47082/log.txt]-

new log link: 
[https://api.travis-ci.org/v3/job/483359744/log.txt|https://api.travis-ci.org/v3/job/483359744/log.txt]

  was:https://api.travis-ci.org/v3/job/47082/log.txt


> Travis failed: No output has been received in the last 10m0s
> 
>
> Key: FLINK-11301
> URL: https://issues.apache.org/jira/browse/FLINK-11301
> Project: Flink
>  Issue Type: Bug
>  Components: Travis
>Reporter: Hequn Cheng
>Priority: Major
>
> -[https://api.travis-ci.org/v3/job/47082/log.txt]-
> new log link: 
> [https://api.travis-ci.org/v3/job/483359744/log.txt|https://api.travis-ci.org/v3/job/483359744/log.txt]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (FLINK-11301) Travis failed: No output has been received in the last 10m0s

2019-01-23 Thread Hequn Cheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11301?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hequn Cheng reopened FLINK-11301:
-

> Travis failed: No output has been received in the last 10m0s
> 
>
> Key: FLINK-11301
> URL: https://issues.apache.org/jira/browse/FLINK-11301
> Project: Flink
>  Issue Type: Bug
>  Components: Travis
>Reporter: Hequn Cheng
>Priority: Major
>
> https://api.travis-ci.org/v3/job/47082/log.txt



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11421) Providing more compilation options for code-generated operators

2019-01-23 Thread Liya Fan (JIRA)
Liya Fan created FLINK-11421:


 Summary: Providing more compilation options for code-generated 
operators
 Key: FLINK-11421
 URL: https://issues.apache.org/jira/browse/FLINK-11421
 Project: Flink
  Issue Type: New Feature
  Components: Core
Reporter: Liya Fan
Assignee: Liya Fan


Flink supports some operators (like Calc, Hash Agg, Hash Join, etc.) by code 
generation. That is, Flink generates their source code dynamically, and then 
compile it into Java Byte Code, which is load and executed at runtime.

 

By default, Flink compiles the generated source code by Janino. This is fast, 
as the compilation often finishes in hundreds of milliseconds. The generated 
Java Byte Code, however, is of poor quality. To illustrate, we use Java 
Compiler API (JCA) to compile the generated code. Experiments on TPC-H (1 TB) 
queries show that the E2E time can be more than 10% shorter, when operators are 
compiled by JCA, despite that it takes more time (a few seconds) to compile 
with JCA.

 

Therefore, we believe it is beneficial to compile generated code by JCA in the 
following scenarios: 1) For batch jobs, the E2E time is relatively long, so it 
is worth of spending more time compiling and generating high quality Java Byte 
Code. 2) For repeated stream jobs, the generated code will be compiled once and 
run many times. Therefore, it pays to spend more time compiling for the first 
time, and enjoy the high byte code qualities for later runs.

 

According to the above observations, we want to provide a compilation option 
(Janino, JCA, or dynamic) for Flink, so that the user can choose the one 
suitable for their specific scenario and obtain better performance whenever 
possible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11416) DISTINCT on a JOIN inside of an UNION is not working

2019-01-23 Thread Hequn Cheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16750611#comment-16750611
 ] 

Hequn Cheng commented on FLINK-11416:
-

Hi [~lordon], Thanks for reporting the issue. It is probably a bug in Calcite. 
I have created a Calcite issue 
[here|https://issues.apache.org/jira/browse/CALCITE-2801]. I wrote the reason 
in it. 

As a workaround, you can remove the {{.distinct()}} after {{.select("c")}}. The 
result should be right as union will also perform distinct for you(union equals 
unionAll+distinct). 

> DISTINCT on a JOIN inside of an UNION is not working
> 
>
> Key: FLINK-11416
> URL: https://issues.apache.org/jira/browse/FLINK-11416
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.7.0, 1.7.1
>Reporter: Elias Saalmann
>Priority: Minor
>
> I get an error (Error while applying rule AggregateUnionAggregateRule) when 
> having a DISTINCT on a result of a JOIN within an UNION, e.g.
> (
>    SELECT DISTINCT c
>    FROM a JOIN b ON a = b
>  )
>  UNION
>  (
>    SELECT c
>    FROM c
>  )
> Full stacktrace:
> Exception in thread "main" java.lang.RuntimeException: Error while applying 
> rule AggregateUnionAggregateRule, args 
> [rel#197:LogicalAggregate.NONE(input=rel#196:Subset#21.NONE,group=\{0}), 
> rel#194:LogicalUnion.NONE(input#0=rel#188:Subset#18.NONE,input#1=rel#189:Subset#19.NONE,all=true),
>  rel#221:LogicalAggregate.NONE(input=rel#184:Subset#16.NONE,group=\{2}), 
> rel#164:LogicalTableScan.NONE(table=[_DataSetTable_2])] 
>     at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:236)
>  
>     at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:646)
>  
>     at 
> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:339) 
>     at 
> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:373)
>  
>     at 
> org.apache.flink.table.api.TableEnvironment.optimizeLogicalPlan(TableEnvironment.scala:292)
>  
>     at 
> org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:455)
>  
>     at 
> org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:475)
>  
>     at 
> org.apache.flink.table.api.java.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:165)
>  
>     at org.myorg.quickstart.TableJob1.main(TableJob1.java:51) 
> Caused by: java.lang.IllegalArgumentException: Cannot compute compatible row 
> type for arguments to set op: RecordType(VARCHAR(65536) a, VARCHAR(65536) b, 
> VARCHAR(65536) c), RecordType(VARCHAR(65536) d) 
>     at org.apache.calcite.rel.core.SetOp.deriveRowType(SetOp.java:111) 
>     at 
> org.apache.calcite.rel.AbstractRelNode.getRowType(AbstractRelNode.java:222) 
>     at org.apache.calcite.tools.RelBuilder$Frame.(RelBuilder.java:2065) 
>     at org.apache.calcite.tools.RelBuilder$Frame.(RelBuilder.java:2050) 
>     at org.apache.calcite.tools.RelBuilder.push(RelBuilder.java:243) 
>     at org.apache.calcite.tools.RelBuilder.setOp(RelBuilder.java:1370) 
>     at org.apache.calcite.tools.RelBuilder.union(RelBuilder.java:1390) 
>     at org.apache.calcite.tools.RelBuilder.union(RelBuilder.java:1380) 
>     at 
> org.apache.calcite.rel.rules.AggregateUnionAggregateRule.onMatch(AggregateUnionAggregateRule.java:130)
>  
>     at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212)
>  
>     ... 8 more
> Full example reproducing the error: 
> [GitHub|https://github.com/lordon/flink_quickstart/blob/master/src/main/java/org/myorg/quickstart/TableJob1.java]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11420) Serialization of case classes containing a Map[String, Any] sometimes throws ArrayIndexOutOfBounds

2019-01-23 Thread JIRA
Jürgen Kreileder created FLINK-11420:


 Summary: Serialization of case classes containing a Map[String, 
Any] sometimes throws ArrayIndexOutOfBounds
 Key: FLINK-11420
 URL: https://issues.apache.org/jira/browse/FLINK-11420
 Project: Flink
  Issue Type: Bug
  Components: Type Serialization System
Affects Versions: 1.7.1
Reporter: Jürgen Kreileder


We frequently run into random ArrayIndexOutOfBounds exceptions when flink tries 
to serialize Scala case classes containing a Map[String, Any] (Any being 
String, Long, Int, or Boolean) with the FsStateBackend. (This probably happens 
with any case class containing a type requiring Kryo, see this thread for 
instance: 
[http://mail-archives.apache.org/mod_mbox/flink-user/201710.mbox/%3cCANNGFpjX4gjV=df6tlfeojsb_rhwxs_ruoylqcqv2gvwqtt...@mail.gmail.com%3e])

Disabling asynchronous snapshots seems to work around the problem, so maybe 
something is not thread-safe in CaseClassSerializer.

Our objects look like this:
{code}
case class Event(timestamp: Long, [...], content: Map[String, Any]
case class EnrichedEvent(event: Event, additionalInfo: Map[String, Any])
{code}
I've looked at a few of the exceptions in a debugger. It always happens when 
serializing the right-hand side a tuple from EnrichedEvent -> Event -> content, 
e.g: 13 from ("foo", 13) or false from ("bar", false).

Stacktrace:
{code:java}
java.lang.ArrayIndexOutOfBoundsException: Index -1 out of bounds for length 0
 at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157)
 at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822)
 at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:863)
 at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:217)
 at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
 at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
 at 
org.apache.flink.api.scala.typeutils.TraversableSerializer.$anonfun$copy$1(TraversableSerializer.scala:69)
 at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:234)
 at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:465)
 at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:465)
 at 
org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:69)
 at 
org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33)
 at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
 at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
 at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
 at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
 at 
org.apache.flink.api.common.typeutils.base.ListSerializer.copy(ListSerializer.java:99)
 at 
org.apache.flink.api.common.typeutils.base.ListSerializer.copy(ListSerializer.java:42)
 at 
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:287)
 at 
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:311)
 at org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:95)
 at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:391)
 at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
 at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
 at java.base/java.lang.Thread.run(Thread.java:834){code}
 

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11420) Serialization of case classes containing a Map[String, Any] sometimes throws ArrayIndexOutOfBoundsException

2019-01-23 Thread JIRA


 [ 
https://issues.apache.org/jira/browse/FLINK-11420?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jürgen Kreileder updated FLINK-11420:
-
Summary: Serialization of case classes containing a Map[String, Any] 
sometimes throws ArrayIndexOutOfBoundsException  (was: Serialization of case 
classes containing a Map[String, Any] sometimes throws ArrayIndexOutOfBounds)

> Serialization of case classes containing a Map[String, Any] sometimes throws 
> ArrayIndexOutOfBoundsException
> ---
>
> Key: FLINK-11420
> URL: https://issues.apache.org/jira/browse/FLINK-11420
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>Affects Versions: 1.7.1
>Reporter: Jürgen Kreileder
>Priority: Major
>
> We frequently run into random ArrayIndexOutOfBounds exceptions when flink 
> tries to serialize Scala case classes containing a Map[String, Any] (Any 
> being String, Long, Int, or Boolean) with the FsStateBackend. (This probably 
> happens with any case class containing a type requiring Kryo, see this thread 
> for instance: 
> [http://mail-archives.apache.org/mod_mbox/flink-user/201710.mbox/%3cCANNGFpjX4gjV=df6tlfeojsb_rhwxs_ruoylqcqv2gvwqtt...@mail.gmail.com%3e])
> Disabling asynchronous snapshots seems to work around the problem, so maybe 
> something is not thread-safe in CaseClassSerializer.
> Our objects look like this:
> {code}
> case class Event(timestamp: Long, [...], content: Map[String, Any]
> case class EnrichedEvent(event: Event, additionalInfo: Map[String, Any])
> {code}
> I've looked at a few of the exceptions in a debugger. It always happens when 
> serializing the right-hand side a tuple from EnrichedEvent -> Event -> 
> content, e.g: 13 from ("foo", 13) or false from ("bar", false).
> Stacktrace:
> {code:java}
> java.lang.ArrayIndexOutOfBoundsException: Index -1 out of bounds for length 0
>  at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157)
>  at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822)
>  at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:863)
>  at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:217)
>  at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
>  at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>  at 
> org.apache.flink.api.scala.typeutils.TraversableSerializer.$anonfun$copy$1(TraversableSerializer.scala:69)
>  at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:234)
>  at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:465)
>  at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:465)
>  at 
> org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:69)
>  at 
> org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33)
>  at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
>  at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>  at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
>  at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>  at 
> org.apache.flink.api.common.typeutils.base.ListSerializer.copy(ListSerializer.java:99)
>  at 
> org.apache.flink.api.common.typeutils.base.ListSerializer.copy(ListSerializer.java:42)
>  at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:287)
>  at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:311)
>  at 
> org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:95)
>  at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:391)
>  at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>  at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>  at java.base/java.lang.Thread.run(Thread.java:834){code}
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] stevenzwu commented on a change in pull request #7020: [FLINK-10774] [Kafka] connection leak when partition discovery is disabled an…

2019-01-23 Thread GitBox
stevenzwu commented on a change in pull request #7020: [FLINK-10774] [Kafka] 
connection leak when partition discovery is disabled an…
URL: https://github.com/apache/flink/pull/7020#discussion_r250397313
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 ##
 @@ -470,7 +470,20 @@ public void open(Configuration configuration) throws 
Exception {
 
subscribedPartitionsToStartOffsets = new HashMap<>();
 
-   List allPartitions = 
partitionDiscoverer.discoverPartitions();
+   List allPartitions;
+   try {
+   allPartitions = 
partitionDiscoverer.discoverPartitions();
+   } finally {
+   if (discoveryIntervalMillis == 
PARTITION_DISCOVERY_DISABLED) {
+   // when partition discovery is disabled,
+   // we should close partitionDiscoverer after 
the initial discovery.
+   // otherwise we may have connection leak,
+   // if open method throws an exception after 
partitionDiscoverer is constructed.
+   // In this case, run method won't be executed
+   // and partitionDiscoverer.close() won't be 
called.
+   partitionDiscoverer.close();
 
 Review comment:
   thank you for the suggestion. updated. please ignore white space changes 
caused by the indention change of try-finally block.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] stevenzwu commented on a change in pull request #7020: [FLINK-10774] [Kafka] connection leak when partition discovery is disabled an…

2019-01-23 Thread GitBox
stevenzwu commented on a change in pull request #7020: [FLINK-10774] [Kafka] 
connection leak when partition discovery is disabled an…
URL: https://github.com/apache/flink/pull/7020#discussion_r250398973
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 ##
 @@ -732,9 +745,7 @@ public void run() {
throw new RuntimeException(discoveryLoopError);
}
} else {
-   // won't be using the discoverer
-   partitionDiscoverer.close();
-
+   // partitionDiscoverer is already closed in open method
 
 Review comment:
   I am not sure if it is necessary to wrap `runFetchLoop` with try-finally.
   1) when discovery disabled, we already take care of it in open method
   2) when discovery enabled, cancel() takes care of it. what if some other 
place in run method throws an exception? we always need cancel() to clean up 
resource if necessary.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] stevenzwu commented on a change in pull request #7020: [FLINK-10774] [Kafka] connection leak when partition discovery is disabled an…

2019-01-23 Thread GitBox
stevenzwu commented on a change in pull request #7020: [FLINK-10774] [Kafka] 
connection leak when partition discovery is disabled an…
URL: https://github.com/apache/flink/pull/7020#discussion_r250397468
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
 ##
 @@ -464,6 +466,34 @@ public void go() throws Exception {
runThread.sync();
}
 
+   @Test
+   public void testClosePartitionDiscovererWhenOpenThrowException() throws 
Exception {
+   final AbstractPartitionDiscoverer mockPartitionDiscoverer = 
mock(AbstractPartitionDiscoverer.class);
+   final AtomicBoolean partitionDiscovererClosed = new 
AtomicBoolean(false);
+   doAnswer((i) -> {
+   partitionDiscovererClosed.set(true);
+   return null;
+   }).when(mockPartitionDiscoverer).close();
+
+   final DummyFlinkKafkaConsumer consumer = new 
DummyFlinkKafkaConsumerThrowExceptionInOpen<>(mockPartitionDiscoverer);
+   consumer.setCommitOffsetsOnCheckpoints(false); // disabling 
offset committing should override everything
+
+   try {
+   setupConsumer(
+   consumer,
+   false,
+   null,
+   false, // enable checkpointing; auto commit 
should be ignored
+   0,
+   1);
 
 Review comment:
   good batch. fixed


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] stevenzwu commented on a change in pull request #7020: [FLINK-10774] [Kafka] connection leak when partition discovery is disabled an…

2019-01-23 Thread GitBox
stevenzwu commented on a change in pull request #7020: [FLINK-10774] [Kafka] 
connection leak when partition discovery is disabled an…
URL: https://github.com/apache/flink/pull/7020#discussion_r250397435
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
 ##
 @@ -464,6 +466,34 @@ public void go() throws Exception {
runThread.sync();
}
 
+   @Test
+   public void testClosePartitionDiscovererWhenOpenThrowException() throws 
Exception {
+   final AbstractPartitionDiscoverer mockPartitionDiscoverer = 
mock(AbstractPartitionDiscoverer.class);
 
 Review comment:
   great idea. done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] stevenzwu commented on a change in pull request #7020: [FLINK-10774] [Kafka] connection leak when partition discovery is disabled an…

2019-01-23 Thread GitBox
stevenzwu commented on a change in pull request #7020: [FLINK-10774] [Kafka] 
connection leak when partition discovery is disabled an…
URL: https://github.com/apache/flink/pull/7020#discussion_r250397313
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 ##
 @@ -470,7 +470,20 @@ public void open(Configuration configuration) throws 
Exception {
 
subscribedPartitionsToStartOffsets = new HashMap<>();
 
-   List allPartitions = 
partitionDiscoverer.discoverPartitions();
+   List allPartitions;
+   try {
+   allPartitions = 
partitionDiscoverer.discoverPartitions();
+   } finally {
+   if (discoveryIntervalMillis == 
PARTITION_DISCOVERY_DISABLED) {
+   // when partition discovery is disabled,
+   // we should close partitionDiscoverer after 
the initial discovery.
+   // otherwise we may have connection leak,
+   // if open method throws an exception after 
partitionDiscoverer is constructed.
+   // In this case, run method won't be executed
+   // and partitionDiscoverer.close() won't be 
called.
+   partitionDiscoverer.close();
 
 Review comment:
   thank you for the suggestion. updated. please ignore white space changes 
caused by the new try-finally block.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-11008) Speed up upload checkpoint files using multi-thread

2019-01-23 Thread Andrey Zagrebin (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11008?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrey Zagrebin updated FLINK-11008:

Fix Version/s: 1.8.0

> Speed up upload checkpoint files using multi-thread
> ---
>
> Key: FLINK-11008
> URL: https://issues.apache.org/jira/browse/FLINK-11008
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Congxian Qiu
>Assignee: Congxian Qiu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> As [FLINK-10461|https://issues.apache.org/jira/browse/FLINK-10461]  did, we 
> could speed up upload checkpoint files by using multi-thread.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-11397) Speed up initialization of AbstractStreamOperatorTestHarness

2019-01-23 Thread TisonKun (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11397?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

TisonKun reassigned FLINK-11397:


Assignee: TisonKun

> Speed up initialization of AbstractStreamOperatorTestHarness
> 
>
> Key: FLINK-11397
> URL: https://issues.apache.org/jira/browse/FLINK-11397
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.8.0
>Reporter: TisonKun
>Assignee: TisonKun
>Priority: Major
>
> Currently Kafka connector tests are unbearably slow, which is tracked by 
> FLINK-10603. With investigation, the construction and initialization of 
> {{AbstractStreamOperatorTestHarness}} is quite slow.
> When walk down the code, it amazed me that {{mockTask = 
> mock(StreamTask.class);}} cost a few of second to finish. If we can introduce 
> a test class instead of mock framework, the situation might be loosen.
> cc [~Zentol] [~pnowojski]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread

2019-01-23 Thread GitBox
azagrebin commented on a change in pull request #7351: [FLINK-11008][State 
Backends, Checkpointing]SpeedUp upload state files using multithread
URL: https://github.com/apache/flink/pull/7351#discussion_r250325106
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploader.java
 ##
 @@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.CheckpointedStateScope;
+import org.apache.flink.runtime.state.StateHandleID;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.function.CheckedSupplier;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Supplier;
+
+/**
+ * Help class for uploading RocksDB state.
+ */
+public class RocksDBStateUploader extends RocksDBStateDownloader {
 
 Review comment:
   `extends RocksDBStateDataTransfer` not `RocksDBStateDownloader`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread

2019-01-23 Thread GitBox
azagrebin commented on a change in pull request #7351: [FLINK-11008][State 
Backends, Checkpointing]SpeedUp upload state files using multithread
URL: https://github.com/apache/flink/pull/7351#discussion_r250317750
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncSnapshotCallable.java
 ##
 @@ -97,6 +97,10 @@ protected void cancel() {
}
}
 
+   protected CloseableRegistry getSnapshotCloseableRegistry() {
+   return snapshotCloseableRegistry;
 
 Review comment:
   could we just make `snapshotCloseableRegistry` protected and use it directly 
in `RocksIncrementalSnapshotStrategy`? I think we do not need 
`(un)registerCloseableForCancellation` methods either afterwards and replace 
calls to them with direct calls to 
`snapshotCloseableRegistry.(un)registerCloseable`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread

2019-01-23 Thread GitBox
azagrebin commented on a change in pull request #7351: [FLINK-11008][State 
Backends, Checkpointing]SpeedUp upload state files using multithread
URL: https://github.com/apache/flink/pull/7351#discussion_r250323327
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 ##
 @@ -701,17 +702,22 @@ public RocksDBNativeMetricOptions 
getMemoryWatcherOptions() {
}
 
/**
-* Gets the thread number will used for downloading files from DFS when 
restore.
+* Gets the number of threads used to transfer files while 
snapshotting/restoring.
 */
-   public int getNumberOfRestoringThreads() {
-   return numberOfRestoringThreads == 
UNDEFINED_NUMBER_OF_RESTORING_THREADS ?
-   
RocksDBOptions.CHECKPOINT_RESTORE_THREAD_NUM.defaultValue() : 
numberOfRestoringThreads;
+   public int getNumberOfTransferingThreads() {
+   return numberOfTransferingThreads == 
UNDEFINED_NUMBER_OF_TRANSFERING_THREADS ?
+   CHECKPOINT_TRANSFER_THREAD_NUM.defaultValue() : 
numberOfTransferingThreads;
}
 
-   public void setNumberOfRestoringThreads(int numberOfRestoringThreads) {
-   Preconditions.checkArgument(numberOfRestoringThreads > 0,
-   "The number of threads used to download files from DFS 
in RocksDBStateBackend should > 0.");
-   this.numberOfRestoringThreads = numberOfRestoringThreads;
+   /**
+* Sets the number of threads used to transfer files while 
snapshotting/restoring.
+*
+* @param numberOfTransferingThreads The number of threads used to 
download files from DFS while restoring.
 
 Review comment:
   `used to download files from DFS while restoring` -> `to transfer files 
while snapshotting/restoring`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread

2019-01-23 Thread GitBox
azagrebin commented on a change in pull request #7351: [FLINK-11008][State 
Backends, Checkpointing]SpeedUp upload state files using multithread
URL: https://github.com/apache/flink/pull/7351#discussion_r250322353
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java
 ##
 @@ -47,10 +47,10 @@
HEAP.name(), ROCKSDB.name()));
 
/**
-* The number of threads used to download files from DFS in 
RocksDBStateBackend.
+* The number of threads used to transfer(download and upload) files in 
RocksDBStateBackend.
 
 Review comment:
   space in `transfer(download` before `(`, also below in similar places


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread

2019-01-23 Thread GitBox
azagrebin commented on a change in pull request #7351: [FLINK-11008][State 
Backends, Checkpointing]SpeedUp upload state files using multithread
URL: https://github.com/apache/flink/pull/7351#discussion_r250324277
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java
 ##
 @@ -36,43 +36,47 @@
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import static 
org.apache.flink.runtime.concurrent.Executors.newDirectExecutorService;
 
 /**
- * Data transfer utils for {@link RocksDBKeyedStateBackend}.
+ * Help class for downloading RocksDBState.
  */
-class RocksDbStateDataTransfer {
+public class RocksDBStateDownloader extends RocksDBStateDataTransfer {
+   public RocksDBStateDownloader(int restoringThreadNum) {
+   super(restoringThreadNum);
+   }
 
-   static void transferAllStateDataToDirectory(
+   /**
+* Transfer all state data to the target directory using specified 
number of threads.
+*
+* @param restoreStateHandle Handles used to retrieve the state data.
+* @param dest The target directory which the state data will be stored.
+* @param closeableRegistry Which all the inputStream/outputStream will 
be registered and unregistered.
+*
+* @throws Exception Thrown if can not transfer all the state data.
+*/
+   public void transferAllStateDataToDirectory(
IncrementalKeyedStateHandle restoreStateHandle,
Path dest,
-   int restoringThreadNum,
CloseableRegistry closeableRegistry) throws Exception {
 
final Map sstFiles =
restoreStateHandle.getSharedState();
final Map miscFiles =
restoreStateHandle.getPrivateState();
 
-   downloadDataForAllStateHandles(sstFiles, dest, 
restoringThreadNum, closeableRegistry);
-   downloadDataForAllStateHandles(miscFiles, dest, 
restoringThreadNum, closeableRegistry);
+   downloadDataForAllStateHandles(sstFiles, dest, 
closeableRegistry);
 
 Review comment:
   could `closeableRegistry` be shared the same way as `executorService` in 
`RocksDBStateDataTransfer`? Then we do not need to pass to all class methods.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread

2019-01-23 Thread GitBox
azagrebin commented on a change in pull request #7351: [FLINK-11008][State 
Backends, Checkpointing]SpeedUp upload state files using multithread
URL: https://github.com/apache/flink/pull/7351#discussion_r250319960
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ##
 @@ -517,6 +516,10 @@ public void restore(Collection 
restoreState) throws Exception
 
LOG.info("Initializing RocksDB keyed state backend.");
 
+   if (LOG.isDebugEnabled()) {
 
 Review comment:
   this log seems to be a duplicate of the following one in try/if/else.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on issue #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer

2019-01-23 Thread GitBox
azagrebin commented on issue #7320: [FLINK-11171] Avoid concurrent usage of 
StateSnapshotTransformer
URL: https://github.com/apache/flink/pull/7320#issuecomment-456912423
 
 
   Thanks for the review @tillrohrmann ! I pushed a commit to address the 
concerns.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] alexeyt820 commented on issue #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers

2019-01-23 Thread GitBox
alexeyt820 commented on issue #6615: [FLINK-8354] [flink-connectors] Add 
ability to access and provider Kafka headers
URL: https://github.com/apache/flink/pull/6615#issuecomment-456910348
 
 
   Weird build is fine locally using 1.8.0_181-b13, but fails on Travis with 
1.8.0_151-b12:
   
   >17:38:32.202 [ERROR] 
/home/travis/build/apache/flink/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONKeyValueDeserializationSchemaTest.java:[105,71]
 cannot infer type arguments for 
org.apache.kafka.clients.consumer.ConsumerRecord<>
   
   for following lines:
   ```
final ConsumerRecord consumerRecord = new 
ConsumerRecord<>(
"topic#1", 3, serializedKey, serializedValue, 4L);
   
   ```
   
   Any ideas?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-11419) StreamingFileSink fails to recover after taskmanager failure

2019-01-23 Thread Edward Rojas (JIRA)
Edward Rojas created FLINK-11419:


 Summary: StreamingFileSink fails to recover after taskmanager 
failure
 Key: FLINK-11419
 URL: https://issues.apache.org/jira/browse/FLINK-11419
 Project: Flink
  Issue Type: Bug
  Components: filesystem-connector
Affects Versions: 1.7.1
Reporter: Edward Rojas


If a job with a StreamingFileSink sending data to HDFS is running in a cluster 
with multiple taskmanagers and the taskmanagers executing the job goes down 
(for some reason) "missing data in tmp file" because it's not able to perform a 
truncate in the file.

 Here the full stack trace:
{code:java}
java.io.IOException: Missing data in tmp file: 
hdfs://path/to/hdfs/2019-01-20/.part-0-0.inprogress.823f9c20-3594-4fe3-ae8c-f57b6c35e191
at 
org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.(HadoopRecoverableFsDataOutputStream.java:93)
at 
org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.recover(HadoopRecoverableWriter.java:72)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restoreInProgressFile(Bucket.java:140)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.(Bucket.java:127)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:396)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:64)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:177)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:165)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:149)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:334)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:745)
Caused by: 
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException):
 Failed to TRUNCATE_FILE 
/path/to/hdfs/2019-01-20/.part-0-0.inprogress.823f9c20-3594-4fe3-ae8c-f57b6c35e191
 for DFSClient_NONMAPREDUCE_-2103482360_62 on x.xxx.xx.xx because this file 
lease is currently owned by DFSClient_NONMAPREDUCE_1834204750_59 on x.xx.xx.xx
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:3190)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncateInternal(FSNamesystem.java:2282)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncateInt(FSNamesystem.java:2228)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncate(FSNamesystem.java:2198)
at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.truncate(NameNodeRpcServer.java:1056)
at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.truncate(ClientNamenodeProtocolServerSideTranslatorPB.java:622)
at 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2351)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2347)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2347)

at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1489)
at org.apache.hadoop.ipc.Client.call(Client.java:1435)
at org.apache.hadoop.ipc.Client.call(Client.java:1345)
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:227)
at 

[GitHub] tweise merged pull request #7559: [FLINK-11048] Mark new RemoteStreamEnvironment constructor PublicEvolving

2019-01-23 Thread GitBox
tweise merged pull request #7559: [FLINK-11048] Mark new 
RemoteStreamEnvironment constructor PublicEvolving
URL: https://github.com/apache/flink/pull/7559
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-11282) Merge StreamRecordWriter into RecordWriter

2019-01-23 Thread Piotr Nowojski (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-11282.
--
Resolution: Fixed

merged commit b12d392 into apache:master

> Merge StreamRecordWriter into RecordWriter
> --
>
> Key: FLINK-11282
> URL: https://issues.apache.org/jira/browse/FLINK-11282
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> {{StreamRecordWriter}} is only used for streaming job which extends 
> {{RecordWriter}}. The only difference in {{StreamRecordWriter}} is 
> maintaining the {{OutputFlusher}} thread which can be migrated into 
> {{RecordWriter}} because the {{flushAlways}} property in {{RecordWriter}} has 
> relationship with {{OutputFlusher}}.
> To do so, we can introduce the special {{BroadcastRecordWriter}} which 
> extends {{RecordWriter}} for improving broadcast selector for 
> [FLINK-10662|https://issues.apache.org/jira/browse/FLINK-10662] . And the 
> {{RecordWriter}} division is unified for both streaming and batch jobs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] pnowojski merged pull request #7438: [FLINK-11282][network] Merge StreamRecordWriter into RecordWriter

2019-01-23 Thread GitBox
pnowojski merged pull request #7438: [FLINK-11282][network] Merge 
StreamRecordWriter into RecordWriter
URL: https://github.com/apache/flink/pull/7438
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Comment Edited] (FLINK-11397) Speed up initialization of AbstractStreamOperatorTestHarness

2019-01-23 Thread Piotr Nowojski (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11397?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16750256#comment-16750256
 ] 

Piotr Nowojski edited comment on FLINK-11397 at 1/23/19 5:34 PM:
-

[~Tison] thanks for reporting the issue. Can you write how did you investigate 
it? Don't get me wrong, I'm a big opponent of using mockito for things like 
that and it would be very valuable on it's own to replace this {{mockTask = 
mock(StreamTask.class)}} with a proper mock or some dummy implementation, but I 
couldn't fully reproduce your findings.

I wrote a simple micro benchmark for both {{mock(StreamTask.class)}} and {{new 
OneInputStreamOperatorTestHarness()}} constructor and it seemed like first one 
takes under 0.01ms to completed. Second one is significantly slower, somewhere 
between 0.1 and 1ms, but I couldn't get the more precise number nor I couldn't 
investigate/profile the code what's taking so much time because apparently 
{{new OneInputStreamOperatorTestHarness()}} is leaking some resources and I was 
getting:

{code:java}
java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method)
at java.lang.Thread.start(Thread.java:717)
at 
org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync.(IOManagerAsync.java:88)
at 
org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync.(IOManagerAsync.java:69)
at 
org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync.(IOManagerAsync.java:60)
at 
org.apache.flink.runtime.operators.testutils.MockEnvironment.(MockEnvironment.java:144)
at 
org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder.build(MockEnvironmentBuilder.java:126)
at 
org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.(AbstractStreamOperatorTestHarness.java:164)
at 
org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness.(OneInputStreamOperatorTestHarness.java:92)
at 
org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness.(OneInputStreamOperatorTestHarness.java:83)
at 
org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness.(OneInputStreamOperatorTestHarness.java:75)
{code}

Regardless of that, 1ms is still pretty far away from "a few seconds".

But as I wrote above - even ignoring the speed up I would more then happy to 
review/merge the code that gets rid of this mockito mock :)


was (Author: pnowojski):
[~Tison] thanks for reporting the issue. Can you tell how have you investigated 
it? Don't get me wrong, I'm a big opponent of using mockito for things like 
that and it would be very valuable on it's own to replace this {{mockTask = 
mock(StreamTask.class)}} with a proper mock or some dummy implementation, but I 
couldn't fully reproduce your findings.

I wrote a simple micro benchmark for both {{mock(StreamTask.class)}} and {{new 
OneInputStreamOperatorTestHarness()}} constructor and it seemed like first one 
takes under 0.01ms to completed. Second one is significantly slower, somewhere 
between 0.1 and 1ms, but I couldn't get the more precise number nor I couldn't 
investigate/profile the code what's taking so much time because apparently 
{{new OneInputStreamOperatorTestHarness()}} is leaking some resources and I was 
getting:

{code:java}
java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method)
at java.lang.Thread.start(Thread.java:717)
at 
org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync.(IOManagerAsync.java:88)
at 
org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync.(IOManagerAsync.java:69)
at 
org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync.(IOManagerAsync.java:60)
at 
org.apache.flink.runtime.operators.testutils.MockEnvironment.(MockEnvironment.java:144)
at 
org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder.build(MockEnvironmentBuilder.java:126)
at 
org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.(AbstractStreamOperatorTestHarness.java:164)
at 
org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness.(OneInputStreamOperatorTestHarness.java:92)
at 
org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness.(OneInputStreamOperatorTestHarness.java:83)
at 
org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness.(OneInputStreamOperatorTestHarness.java:75)
{code}

Regardless of that, 1ms is still pretty far away from "a few seconds".

But as I wrote above - even ignoring the speed up I would more then happy to 
review/merge the code that gets rid of this mockito mock :)

> Speed up initialization of AbstractStreamOperatorTestHarness
> 
>
> Key: FLINK-11397
> URL: 

[jira] [Commented] (FLINK-11397) Speed up initialization of AbstractStreamOperatorTestHarness

2019-01-23 Thread Piotr Nowojski (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11397?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16750256#comment-16750256
 ] 

Piotr Nowojski commented on FLINK-11397:


[~Tison] thanks for reporting the issue. Can you tell how have you investigated 
it? Don't get me wrong, I'm a big opponent of using mockito for things like 
that and it would be very valuable on it's own to replace this {{mockTask = 
mock(StreamTask.class)}} with a proper mock or some dummy implementation, but I 
couldn't fully reproduce your findings.

I wrote a simple micro benchmark for both {{mock(StreamTask.class)}} and {{new 
OneInputStreamOperatorTestHarness()}} constructor and it seemed like first one 
takes under 0.01ms to completed. Second one is significantly slower, somewhere 
between 0.1 and 1ms, but I couldn't get the more precise number nor I couldn't 
investigate/profile the code what's taking so much time because apparently 
{{new OneInputStreamOperatorTestHarness()}} is leaking some resources and I was 
getting:

{code:java}
java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method)
at java.lang.Thread.start(Thread.java:717)
at 
org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync.(IOManagerAsync.java:88)
at 
org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync.(IOManagerAsync.java:69)
at 
org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync.(IOManagerAsync.java:60)
at 
org.apache.flink.runtime.operators.testutils.MockEnvironment.(MockEnvironment.java:144)
at 
org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder.build(MockEnvironmentBuilder.java:126)
at 
org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.(AbstractStreamOperatorTestHarness.java:164)
at 
org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness.(OneInputStreamOperatorTestHarness.java:92)
at 
org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness.(OneInputStreamOperatorTestHarness.java:83)
at 
org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness.(OneInputStreamOperatorTestHarness.java:75)
{code}

Regardless of that, 1ms is still pretty far away from "a few seconds".

But as I wrote above - even ignoring the speed up I would more then happy to 
review/merge the code that gets rid of this mockito mock :)

> Speed up initialization of AbstractStreamOperatorTestHarness
> 
>
> Key: FLINK-11397
> URL: https://issues.apache.org/jira/browse/FLINK-11397
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.8.0
>Reporter: TisonKun
>Priority: Major
>
> Currently Kafka connector tests are unbearably slow, which is tracked by 
> FLINK-10603. With investigation, the construction and initialization of 
> {{AbstractStreamOperatorTestHarness}} is quite slow.
> When walk down the code, it amazed me that {{mockTask = 
> mock(StreamTask.class);}} cost a few of second to finish. If we can introduce 
> a test class instead of mock framework, the situation might be loosen.
> cc [~Zentol] [~pnowojski]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11418) Unable to build docs in Docker image

2019-01-23 Thread Chesnay Schepler (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16750185#comment-16750185
 ] 

Chesnay Schepler commented on FLINK-11418:
--

Did you try following the docker instructions that were added recently? (see 
[here|https://flink.apache.org/improve-website.html#update-or-extend-the-documentation])

> Unable to build docs in Docker image
> 
>
> Key: FLINK-11418
> URL: https://issues.apache.org/jira/browse/FLINK-11418
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.8.0
>Reporter: Robert Metzger
>Priority: Major
>
> Running 
> {code:java}
> cd flink/docs/docker
> ./run.sh{code}
>  
> And then in the container
> {code:java}
> Welcome to Apache Flink docs
> To build, execute
> ./build_docs.sh
> To watch and regenerate automatically
> ./build_docs.sh -p
> and access http://localhost:4000
> bash-4.4$ ./build_docs.sh -p
> Traceback (most recent call last):
> 2: from /usr/local/bin/bundle:23:in `'
> 1: from /usr/share/rubygems/rubygems.rb:308:in `activate_bin_path'
> /usr/share/rubygems/rubygems.rb:289:in `find_spec_for_exe': can't find gem 
> bundler (>= 0.a) with executable bundle (Gem::GemNotFoundException){code}
> I believe there's something wrong.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9920) BucketingSinkFaultToleranceITCase fails on travis

2019-01-23 Thread Till Rohrmann (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9920?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16750226#comment-16750226
 ] 

Till Rohrmann commented on FLINK-9920:
--

Another instance: https://api.travis-ci.org/v3/job/483415515/log.txt

> BucketingSinkFaultToleranceITCase fails on travis
> -
>
> Key: FLINK-9920
> URL: https://issues.apache.org/jira/browse/FLINK-9920
> Project: Flink
>  Issue Type: Bug
>  Components: filesystem-connector
>Affects Versions: 1.6.0, 1.7.0
>Reporter: Chesnay Schepler
>Assignee: Aljoscha Krettek
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.8.0
>
>
> https://travis-ci.org/zentol/flink/jobs/407021898
> {code}
> Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 13.082 sec 
> <<< FAILURE! - in 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSinkFaultToleranceITCase
> runCheckpointedProgram(org.apache.flink.streaming.connectors.fs.bucketing.BucketingSinkFaultToleranceITCase)
>   Time elapsed: 5.696 sec  <<< FAILURE!
> java.lang.AssertionError: Read line does not match expected pattern.
>   at org.junit.Assert.fail(Assert.java:88)
>   at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSinkFaultToleranceITCase.postSubmit(BucketingSinkFaultToleranceITCase.java:182)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11418) Unable to build docs in Docker image

2019-01-23 Thread Robert Metzger (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16750221#comment-16750221
 ] 

Robert Metzger commented on FLINK-11418:


:) I have found a fix for the issue. Will open a PR soon.

> Unable to build docs in Docker image
> 
>
> Key: FLINK-11418
> URL: https://issues.apache.org/jira/browse/FLINK-11418
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.8.0
>Reporter: Robert Metzger
>Priority: Major
>
> Running 
> {code:java}
> cd flink/docs/docker
> ./run.sh{code}
>  
> And then in the container
> {code:java}
> Welcome to Apache Flink docs
> To build, execute
> ./build_docs.sh
> To watch and regenerate automatically
> ./build_docs.sh -p
> and access http://localhost:4000
> bash-4.4$ ./build_docs.sh -p
> Traceback (most recent call last):
> 2: from /usr/local/bin/bundle:23:in `'
> 1: from /usr/share/rubygems/rubygems.rb:308:in `activate_bin_path'
> /usr/share/rubygems/rubygems.rb:289:in `find_spec_for_exe': can't find gem 
> bundler (>= 0.a) with executable bundle (Gem::GemNotFoundException){code}
> I believe there's something wrong.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] alexeyt820 commented on a change in pull request #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers

2019-01-23 Thread GitBox
alexeyt820 commented on a change in pull request #6615: [FLINK-8354] 
[flink-connectors] Add ability to access and provider Kafka headers
URL: https://github.com/apache/flink/pull/6615#discussion_r250282725
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java
 ##
 @@ -55,4 +57,13 @@
 * @return null or the target topic
 */
String getTargetTopic(T element);
+
+   /**
+*
+* @param element The incoming element to be serialized
+* @return collection of headers (maybe empty)
+*/
+   default Iterable> headers(T element) {
 
 Review comment:
   I can try to use ProducerRecord by adding factory method to 
KeyedSerializationSchema
   ```
   interface KeyedSerializationSchema {
   ...
default ProducerRecord newProducerRecord(T element, 
byte[] serializedKey, byte[] serializedValue, Integer partition, String 
topic, Long timestamp) {
return new ProducerRecord<>(topic, partition, timestamp, 
serializedKey, serializedValue);
}
   }
   ```
   And then in a producer:
   ```
String targetTopic = schema.getTargetTopic(next);
if (targetTopic == null) {
targetTopic = defaultTopicId;
}
...
ProducerRecord record = schema.newProducerRecord(
  next, 
  serializedKey,
  serializedValue
  flinkKafkaPartitioner != null ?
flinkKafkaPartitioner.partition(next, serializedKey, 
serializedValue, targetTopic, partitions) : null,
  targetTopic, 
  timestamp);
  ...
   ```
   Note that we can't deprecate getTargetTopic, serializeKey and 
serializedValue, because they are used by partitioner as well.
   
   What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] alexeyt820 commented on a change in pull request #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers

2019-01-23 Thread GitBox
alexeyt820 commented on a change in pull request #6615: [FLINK-8354] 
[flink-connectors] Add ability to access and provider Kafka headers
URL: https://github.com/apache/flink/pull/6615#discussion_r250282725
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java
 ##
 @@ -55,4 +57,13 @@
 * @return null or the target topic
 */
String getTargetTopic(T element);
+
+   /**
+*
+* @param element The incoming element to be serialized
+* @return collection of headers (maybe empty)
+*/
+   default Iterable> headers(T element) {
 
 Review comment:
   I can try to use ProducerRecord by adding factory method to 
KeyedSerializationSchema
   ```suggestion
   interface KeyedSerializationSchema {
   ...
default ProducerRecord newProducerRecord(T element, 
byte[] serializedKey, byte[] serializedValue, Integer partition, String 
topic, Long timestamp) {
return new ProducerRecord<>(topic, partition, timestamp, 
serializedKey, serializedValue);
}
   }
   ```
   And then in a producer:
   ```suggestion
String targetTopic = schema.getTargetTopic(next);
if (targetTopic == null) {
targetTopic = defaultTopicId;
}
...
ProducerRecord record = schema.newProducerRecord(
  next, 
  serializedKey,
  serializedValue
  flinkKafkaPartitioner != null ?
flinkKafkaPartitioner.partition(next, serializedKey, 
serializedValue, targetTopic, partitions) : null,
  targetTopic, 
  timestamp);
  ...
   ```
   Note that we can't deprecate getTargetTopic, serializeKey and 
serializedValue, because they are used by partitioner as well.
   
   What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11418) Unable to build docs in Docker image

2019-01-23 Thread Chesnay Schepler (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16750200#comment-16750200
 ] 

Chesnay Schepler commented on FLINK-11418:
--

oh, this is about the _flink_ docs, not flink-web :/

> Unable to build docs in Docker image
> 
>
> Key: FLINK-11418
> URL: https://issues.apache.org/jira/browse/FLINK-11418
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.8.0
>Reporter: Robert Metzger
>Priority: Major
>
> Running 
> {code:java}
> cd flink/docs/docker
> ./run.sh{code}
>  
> And then in the container
> {code:java}
> Welcome to Apache Flink docs
> To build, execute
> ./build_docs.sh
> To watch and regenerate automatically
> ./build_docs.sh -p
> and access http://localhost:4000
> bash-4.4$ ./build_docs.sh -p
> Traceback (most recent call last):
> 2: from /usr/local/bin/bundle:23:in `'
> 1: from /usr/share/rubygems/rubygems.rb:308:in `activate_bin_path'
> /usr/share/rubygems/rubygems.rb:289:in `find_spec_for_exe': can't find gem 
> bundler (>= 0.a) with executable bundle (Gem::GemNotFoundException){code}
> I believe there's something wrong.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11418) Unable to build docs in Docker image

2019-01-23 Thread Robert Metzger (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16750192#comment-16750192
 ] 

Robert Metzger commented on FLINK-11418:


Ah, I didn't know about these. The readme file in the "docs" directory did not 
mention them.

I'll check them out.

> Unable to build docs in Docker image
> 
>
> Key: FLINK-11418
> URL: https://issues.apache.org/jira/browse/FLINK-11418
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.8.0
>Reporter: Robert Metzger
>Priority: Major
>
> Running 
> {code:java}
> cd flink/docs/docker
> ./run.sh{code}
>  
> And then in the container
> {code:java}
> Welcome to Apache Flink docs
> To build, execute
> ./build_docs.sh
> To watch and regenerate automatically
> ./build_docs.sh -p
> and access http://localhost:4000
> bash-4.4$ ./build_docs.sh -p
> Traceback (most recent call last):
> 2: from /usr/local/bin/bundle:23:in `'
> 1: from /usr/share/rubygems/rubygems.rb:308:in `activate_bin_path'
> /usr/share/rubygems/rubygems.rb:289:in `find_spec_for_exe': can't find gem 
> bundler (>= 0.a) with executable bundle (Gem::GemNotFoundException){code}
> I believe there's something wrong.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] azagrebin commented on a change in pull request #7526: [FLINK-11353][tests] Port JobManagerHAJobGraphRecoveryITCase to new code base

2019-01-23 Thread GitBox
azagrebin commented on a change in pull request #7526: [FLINK-11353][tests] 
Port JobManagerHAJobGraphRecoveryITCase to new code base
URL: https://github.com/apache/flink/pull/7526#discussion_r250231423
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
 ##
 @@ -225,31 +206,82 @@ public static JobGraph createNonEmptyJobGraph() {
return jobGraph;
}
 
-   private static class HATestingDispatcher extends TestingDispatcher {
+   @Nonnull
+   private HATestingDispatcher 
createDispatcher(TestingHighAvailabilityServices highAvailabilityServices, 
Queue fencingTokens) throws Exception {
+   return createDispatcher(highAvailabilityServices, 
fencingTokens, new TestingJobManagerRunnerFactory(new CompletableFuture<>(), 
new CompletableFuture<>(), CompletableFuture.completedFuture(null)));
+   }
 
-   @Nonnull
-   private final BlockingQueue fencingTokens;
+   @Nonnull
+   private HATestingDispatcher createDispatcher(
+   TestingHighAvailabilityServices highAvailabilityServices,
+   @Nullable Queue fencingTokens,
+   Dispatcher.JobManagerRunnerFactory jobManagerRunnerFactory) 
throws Exception {
+   final Configuration configuration = new Configuration();
 
-   HATestingDispatcher(RpcService rpcService, String endpointId, 
Configuration configuration, HighAvailabilityServices highAvailabilityServices, 
ResourceManagerGateway resourceManagerGateway, BlobServer blobServer, 
HeartbeatServices heartbeatServices, JobManagerMetricGroup 
jobManagerMetricGroup, @Nullable String metricQueryServicePath, 
ArchivedExecutionGraphStore archivedExecutionGraphStore, 
JobManagerRunnerFactory jobManagerRunnerFactory, FatalErrorHandler 
fatalErrorHandler, @Nonnull BlockingQueue fencingTokens) throws 
Exception {
-   super(rpcService, endpointId, configuration, 
highAvailabilityServices, resourceManagerGateway, blobServer, 
heartbeatServices, jobManagerMetricGroup, metricQueryServicePath, 
archivedExecutionGraphStore, jobManagerRunnerFactory, fatalErrorHandler);
-   this.fencingTokens = fencingTokens;
-   }
+   return new HATestingDispatcher(
+   rpcService,
+   UUID.randomUUID().toString(),
+   configuration,
+   highAvailabilityServices,
+   new TestingResourceManagerGateway(),
+   new BlobServer(configuration, new VoidBlobStore()),
+   new HeartbeatServices(1000L, 1000L),
+   
UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
+   null,
+   new MemoryArchivedExecutionGraphStore(),
+   jobManagerRunnerFactory,
+   testingFatalErrorHandler,
+   fencingTokens);
+   }
 
-   @VisibleForTesting
-   CompletableFuture getNumberJobs(Time timeout) {
-   return callAsyncWithoutFencing(
-   () -> listJobs(timeout).get().size(),
-   timeout);
+   private static class HATestingDispatcher extends TestingDispatcher {
+
+   @Nullable
+   private final Queue fencingTokens;
+
+   HATestingDispatcher(
+   RpcService rpcService,
+   String endpointId,
+   Configuration configuration,
+   HighAvailabilityServices 
highAvailabilityServices,
+   ResourceManagerGateway resourceManagerGateway,
+   BlobServer blobServer,
+   HeartbeatServices heartbeatServices,
+   JobManagerMetricGroup jobManagerMetricGroup,
+   @Nullable String metricQueryServicePath,
+   ArchivedExecutionGraphStore 
archivedExecutionGraphStore,
+   JobManagerRunnerFactory jobManagerRunnerFactory,
+   FatalErrorHandler fatalErrorHandler,
+   @Nullable Queue fencingTokens) 
throws Exception {
+   super(
+   rpcService,
+   endpointId,
+   configuration,
+   highAvailabilityServices,
+   resourceManagerGateway,
+   blobServer,
+   heartbeatServices,
+   jobManagerMetricGroup,
+   metricQueryServicePath,
+   archivedExecutionGraphStore,
+ 

[jira] [Assigned] (FLINK-11362) Check and port TaskManagerComponentsStartupShutdownTest to new code base if necessary

2019-01-23 Thread boshu Zheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11362?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

boshu Zheng reassigned FLINK-11362:
---

Assignee: (was: boshu Zheng)

> Check and port TaskManagerComponentsStartupShutdownTest to new code base if 
> necessary
> -
>
> Key: FLINK-11362
> URL: https://issues.apache.org/jira/browse/FLINK-11362
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: Till Rohrmann
>Priority: Major
>
> Check and port {{TaskManagerComponentsStartupShutdownTest}} to new code base 
> if necessary.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11418) Unable to build docs in Docker image

2019-01-23 Thread Robert Metzger (JIRA)
Robert Metzger created FLINK-11418:
--

 Summary: Unable to build docs in Docker image
 Key: FLINK-11418
 URL: https://issues.apache.org/jira/browse/FLINK-11418
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Affects Versions: 1.8.0
Reporter: Robert Metzger


Running 
{code:java}
cd flink/docs/docker
./run.sh{code}
 

And then in the container
{code:java}
Welcome to Apache Flink docs
To build, execute
./build_docs.sh
To watch and regenerate automatically
./build_docs.sh -p
and access http://localhost:4000

bash-4.4$ ./build_docs.sh -p
Traceback (most recent call last):
2: from /usr/local/bin/bundle:23:in `'
1: from /usr/share/rubygems/rubygems.rb:308:in `activate_bin_path'
/usr/share/rubygems/rubygems.rb:289:in `find_spec_for_exe': can't find gem 
bundler (>= 0.a) with executable bundle (Gem::GemNotFoundException){code}
I believe there's something wrong.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] alexeyt820 commented on a change in pull request #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers

2019-01-23 Thread GitBox
alexeyt820 commented on a change in pull request #6615: [FLINK-8354] 
[flink-connectors] Add ability to access and provider Kafka headers
URL: https://github.com/apache/flink/pull/6615#discussion_r250247173
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java
 ##
 @@ -55,4 +57,13 @@
 * @return null or the target topic
 */
String getTargetTopic(T element);
+
+   /**
+*
+* @param element The incoming element to be serialized
+* @return collection of headers (maybe empty)
+*/
+   default Iterable> headers(T element) {
 
 Review comment:
   Not sure how calling one constructor can be easier than calling another... 
Map.Entry is more convenient in case headers representation in element is Map 
(or Guava's Multimap), in that case it just particular implementation would be 
element.entrySet()


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on issue #7546: [FLINK-11390][tests] Port YARNSessionCapacitySchedulerITCase#testTaskManagerFailure to new code base

2019-01-23 Thread GitBox
tillrohrmann commented on issue #7546: [FLINK-11390][tests] Port 
YARNSessionCapacitySchedulerITCase#testTaskManagerFailure to new code base
URL: https://github.com/apache/flink/pull/7546#issuecomment-456836215
 
 
   As @zentol pointed out, it could work to use the metrics to query the number 
of restarts.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7546: [FLINK-11390][tests] Port YARNSessionCapacitySchedulerITCase#testTaskManagerFailure to new code base

2019-01-23 Thread GitBox
tillrohrmann commented on a change in pull request #7546: [FLINK-11390][tests] 
Port YARNSessionCapacitySchedulerITCase#testTaskManagerFailure to new code base
URL: https://github.com/apache/flink/pull/7546#discussion_r250224836
 
 

 ##
 File path: 
flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
 ##
 @@ -187,178 +188,146 @@ public void perJobYarnClusterOffHeap() throws 
IOException {
}
 
/**
-* Test TaskManager failure and also if the vcores are set correctly 
(see issue FLINK-2213).
+* Starts a session cluster on YARN, and submits a streaming job.
+*
+* Tests
+* 
+* if a custom YARN application name can be set from the command 
line,
+* if the number of TaskManager slots can be set from the command 
line,
+* if dynamic properties from the command line are set,
+* if the vcores are set correctly (FLINK-2213),
+* if jobmanager hostname/port are shown in web interface 
(FLINK-1902)
+* 
+*
+* Hint:  If you think it is a good idea to add more 
assertions to this test, think again!
 */
-   @Test(timeout = 10) // timeout after 100 seconds
-   public void testTaskManagerFailure() throws Exception {
-   assumeTrue("The new mode does not start TMs upfront.", 
!isNewMode);
-   LOG.info("Starting testTaskManagerFailure()");
-   Runner runner = startWithArgs(new String[]{"-j", 
flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(),
-   "-n", "1",
+   @Test(timeout = 100_000)
+   public void 
testVCoresAreSetCorrectlyAndJobManagerHostnameAreShownInWebInterfaceAndDynamicPropertiesAndYarnApplicationNameAndTaskManagerSlots()
 throws Exception {
+   final Runner yarnSessionClusterRunner = startWithArgs(new 
String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", 
flinkLibFolder.getAbsolutePath(),
"-jm", "768m",
"-tm", "1024m",
"-s", "3", // set the slots 3 to check if the 
vCores are set properly!
"-nm", "customName",
"-Dfancy-configuration-value=veryFancy",
"-Dyarn.maximum-failed-containers=3",
"-D" + YarnConfigOptions.VCORES.key() + "=2"},
-   "Number of connected TaskManagers changed to 1. Slots 
available: 3",
+   "Flink JobManager is now running on ",
RunTypes.YARN_SESSION);
 
-   Assert.assertEquals(2, getRunningContainers());
+   final String logs = outContent.toString();
+   final String hostname = parseJobManagerHostname(logs);
+   LOG.info("Extracted hostname: {}", hostname);
 
-   //  Test if JobManager web interface is 
accessible ---
+   final ApplicationReport applicationReport = 
getOnlyApplicationReport();
+   final String restApiBaseUrl = 
normalizeTrackingUrl(applicationReport.getTrackingUrl());
+   LOG.info("Got application URL from YARN {}", restApiBaseUrl);
 
-   final YarnClient yc = YarnClient.createYarnClient();
-   yc.init(YARN_CONFIGURATION);
-   yc.start();
+   submitJob("WindowJoin.jar");
+   waitForTaskManagerRegistration(restApiBaseUrl, 
Duration.ofMillis(30_000));
 
-   List apps = 
yc.getApplications(EnumSet.of(YarnApplicationState.RUNNING));
-   Assert.assertEquals(1, apps.size()); // Only one running
-   ApplicationReport app = apps.get(0);
-   Assert.assertEquals("customName", app.getName());
-   String url = app.getTrackingUrl();
-   if (!url.endsWith("/")) {
-   url += "/";
-   }
-   if (!url.startsWith("http://;)) {
-   url = "http://; + url;
-   }
-   LOG.info("Got application URL from YARN {}", url);
+   //
+   // Assert that custom YARN application name "customName" is set
+   //
+   assertEquals("customName", applicationReport.getName());
 
-   String response = TestBaseUtils.getFromHTTP(url + 
"taskmanagers/");
+   //
+   // Assert the number of TaskManager slots are set
+   //
+   assertNumberOfSlotsPerTask(restApiBaseUrl, 3);
 
-   JsonNode parsedTMs = new ObjectMapper().readTree(response);
-   ArrayNode taskManagers = (ArrayNode) 
parsedTMs.get("taskmanagers");
-   Assert.assertNotNull(taskManagers);
-   Assert.assertEquals(1, taskManagers.size());
-   Assert.assertEquals(3, 

[GitHub] tillrohrmann commented on a change in pull request #7546: [FLINK-11390][tests] Port YARNSessionCapacitySchedulerITCase#testTaskManagerFailure to new code base

2019-01-23 Thread GitBox
tillrohrmann commented on a change in pull request #7546: [FLINK-11390][tests] 
Port YARNSessionCapacitySchedulerITCase#testTaskManagerFailure to new code base
URL: https://github.com/apache/flink/pull/7546#discussion_r250225736
 
 

 ##
 File path: 
flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
 ##
 @@ -187,178 +188,146 @@ public void perJobYarnClusterOffHeap() throws 
IOException {
}
 
/**
-* Test TaskManager failure and also if the vcores are set correctly 
(see issue FLINK-2213).
+* Starts a session cluster on YARN, and submits a streaming job.
+*
+* Tests
+* 
+* if a custom YARN application name can be set from the command 
line,
+* if the number of TaskManager slots can be set from the command 
line,
+* if dynamic properties from the command line are set,
+* if the vcores are set correctly (FLINK-2213),
+* if jobmanager hostname/port are shown in web interface 
(FLINK-1902)
+* 
+*
+* Hint:  If you think it is a good idea to add more 
assertions to this test, think again!
 */
-   @Test(timeout = 10) // timeout after 100 seconds
-   public void testTaskManagerFailure() throws Exception {
-   assumeTrue("The new mode does not start TMs upfront.", 
!isNewMode);
-   LOG.info("Starting testTaskManagerFailure()");
-   Runner runner = startWithArgs(new String[]{"-j", 
flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(),
-   "-n", "1",
+   @Test(timeout = 100_000)
+   public void 
testVCoresAreSetCorrectlyAndJobManagerHostnameAreShownInWebInterfaceAndDynamicPropertiesAndYarnApplicationNameAndTaskManagerSlots()
 throws Exception {
+   final Runner yarnSessionClusterRunner = startWithArgs(new 
String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", 
flinkLibFolder.getAbsolutePath(),
"-jm", "768m",
"-tm", "1024m",
"-s", "3", // set the slots 3 to check if the 
vCores are set properly!
"-nm", "customName",
"-Dfancy-configuration-value=veryFancy",
"-Dyarn.maximum-failed-containers=3",
"-D" + YarnConfigOptions.VCORES.key() + "=2"},
-   "Number of connected TaskManagers changed to 1. Slots 
available: 3",
+   "Flink JobManager is now running on ",
RunTypes.YARN_SESSION);
 
-   Assert.assertEquals(2, getRunningContainers());
+   final String logs = outContent.toString();
+   final String hostname = parseJobManagerHostname(logs);
+   LOG.info("Extracted hostname: {}", hostname);
 
-   //  Test if JobManager web interface is 
accessible ---
+   final ApplicationReport applicationReport = 
getOnlyApplicationReport();
+   final String restApiBaseUrl = 
normalizeTrackingUrl(applicationReport.getTrackingUrl());
+   LOG.info("Got application URL from YARN {}", restApiBaseUrl);
 
-   final YarnClient yc = YarnClient.createYarnClient();
-   yc.init(YARN_CONFIGURATION);
-   yc.start();
+   submitJob("WindowJoin.jar");
+   waitForTaskManagerRegistration(restApiBaseUrl, 
Duration.ofMillis(30_000));
 
-   List apps = 
yc.getApplications(EnumSet.of(YarnApplicationState.RUNNING));
-   Assert.assertEquals(1, apps.size()); // Only one running
-   ApplicationReport app = apps.get(0);
-   Assert.assertEquals("customName", app.getName());
-   String url = app.getTrackingUrl();
-   if (!url.endsWith("/")) {
-   url += "/";
-   }
-   if (!url.startsWith("http://;)) {
-   url = "http://; + url;
-   }
-   LOG.info("Got application URL from YARN {}", url);
+   //
+   // Assert that custom YARN application name "customName" is set
+   //
+   assertEquals("customName", applicationReport.getName());
 
-   String response = TestBaseUtils.getFromHTTP(url + 
"taskmanagers/");
+   //
+   // Assert the number of TaskManager slots are set
+   //
+   assertNumberOfSlotsPerTask(restApiBaseUrl, 3);
 
-   JsonNode parsedTMs = new ObjectMapper().readTree(response);
-   ArrayNode taskManagers = (ArrayNode) 
parsedTMs.get("taskmanagers");
-   Assert.assertNotNull(taskManagers);
-   Assert.assertEquals(1, taskManagers.size());
-   Assert.assertEquals(3, 

[GitHub] tillrohrmann commented on a change in pull request #7546: [FLINK-11390][tests] Port YARNSessionCapacitySchedulerITCase#testTaskManagerFailure to new code base

2019-01-23 Thread GitBox
tillrohrmann commented on a change in pull request #7546: [FLINK-11390][tests] 
Port YARNSessionCapacitySchedulerITCase#testTaskManagerFailure to new code base
URL: https://github.com/apache/flink/pull/7546#discussion_r250227506
 
 

 ##
 File path: 
flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
 ##
 @@ -124,24 +135,84 @@ public void testKillYarnSessionClusterEntrypoint() 
throws Exception {
 
final ApplicationId id = restClusterClient.getClusterId();
 
-   waitUntilJobIsRunning(restClusterClient, jobId, RETRY_TIMEOUT);
+   waitUntilJobIsRunning(restClusterClient, jobId);
 

killApplicationMaster(yarnClusterDescriptor.getYarnSessionClusterEntrypoint());
+   waitForApplicationAttempt(id, 2);
 
+   waitUntilJobIsRunning(restClusterClient, jobId);
+
+   killApplicationAndWait(id);
+   }
+
+   @Test
+   public void testJobRecoversAfterKillingTaskManager() throws Exception {
+   final RestClusterClient restClusterClient = 
deploySessionCluster(setupYarnClusterDescriptor());
+   final JobID jobId = submitJob(restClusterClient, 
createJobGraph());
+   final ApplicationId id = restClusterClient.getClusterId();
+   waitUntilJobIsRunning(restClusterClient, jobId);
+
+   stopTaskManagerContainer();
+   waitUntilJobIsRunning(restClusterClient, jobId);
 
 Review comment:
   Can it happen that the JobManager does not realize that a `TM` has died 
before we call `waitUntilJobIsRunning`? Is there an easy way to check whether 
we have restarted the job?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11249) FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer

2019-01-23 Thread Piotr Nowojski (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16750075#comment-16750075
 ] 

Piotr Nowojski commented on FLINK-11249:


Thanks for the response [~elevy]. That would be my guess & hope as well, but 
unfortunately we need to test for that.

> FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer
> ---
>
> Key: FLINK-11249
> URL: https://issues.apache.org/jira/browse/FLINK-11249
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector
>Affects Versions: 1.7.0, 1.7.1
>Reporter: Piotr Nowojski
>Assignee: vinoyang
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.7.2, 1.8.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> As reported by a user on the mailing list "How to migrate Kafka Producer ?" 
> (on 18th December 2018), {{FlinkKafkaProducer011}} can not be migrated to 
> {{FlinkKafkaProducer}} and the same problem can occur in the future Kafka 
> producer versions/refactorings.
> The issue is that {{ListState 
> FlinkKafkaProducer#nextTransactionalIdHintState}} field is serialized using 
> java serializers and this is causing problems/collisions on 
> {{FlinkKafkaProducer011.NextTransactionalIdHint}}  vs
> {{FlinkKafkaProducer.NextTransactionalIdHint}}.
> To fix that we probably need to release new versions of those classes, that 
> will rewrite/upgrade this state field to a new one, that doesn't relay on 
> java serialization. After this, we could drop the support for the old field 
> and that in turn will allow users to upgrade from 0.11 connector to the 
> universal one.
> One bright side is that technically speaking our {{FlinkKafkaProducer011}} 
> has the same compatibility matrix as the universal one (it's also forward & 
> backward compatible with the same Kafka versions), so for the time being 
> users can stick to {{FlinkKafkaProducer011}}.
> FYI [~tzulitai] [~yanghua]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] azagrebin commented on issue #7188: [FLINK-10473][State TTL] TTL state incremental cleanup for heap backend

2019-01-23 Thread GitBox
azagrebin commented on issue #7188: [FLINK-10473][State TTL] TTL state 
incremental cleanup for heap backend
URL: https://github.com/apache/flink/pull/7188#issuecomment-456828901
 
 
   Thanks for the review @StefanRRichter ! I activated this feature in the 
existing e2e test. I also extended the unit test to check that the snapshot is 
not affected by modifications which happen concurrently after triggering the 
snapshot. Please, have a look if you have time.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] igalshilman commented on issue #7557: [FLINK-11406] [core] Return INCOMPATIBLE when nested serializers arity don't match in CompositeTypeSerializerSnapshot

2019-01-23 Thread GitBox
igalshilman commented on issue #7557: [FLINK-11406] [core] Return INCOMPATIBLE 
when nested serializers arity don't match in CompositeTypeSerializerSnapshot
URL: https://github.com/apache/flink/pull/7557#issuecomment-456826190
 
 
   LGTM  


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer

2019-01-23 Thread GitBox
tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid 
concurrent usage of StateSnapshotTransformer
URL: https://github.com/apache/flink/pull/7320#discussion_r250218492
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSnapshotTransformFactoryAdaptor.java
 ##
 @@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import 
org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTransformFactory;
+
+import java.util.Optional;
+
+abstract class RocksDBSnapshotTransformFactoryAdaptor implements 
StateSnapshotTransformFactory {
+   final StateSnapshotTransformFactory snapshotTransformFactory;
+
+   
RocksDBSnapshotTransformFactoryAdaptor(StateSnapshotTransformFactory 
snapshotTransformFactory) {
+   this.snapshotTransformFactory = snapshotTransformFactory;
+   }
+
+   @Override
+   public Optional> 
createForDeserializedState() {
+   throw new UnsupportedOperationException("Only serialized state 
filtering is supported in RocksDB backend");
+   }
+
+   @SuppressWarnings("unchecked")
+   static  StateSnapshotTransformFactory 
wrapStateSnapshotTransformFactory(
+   StateDescriptor stateDesc,
+   StateSnapshotTransformFactory snapshotTransformFactory) {
+   if (stateDesc instanceof ListStateDescriptor) {
+   TypeSerializer elementSerializer =  
((ListStateDescriptor) stateDesc).getElementSerializer();
 
 Review comment:
   Also if the `TypeSerializer` is stateful, we might run into problems because 
we are sharing the elementSerializer across all `StateSnapshotTransformers`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer

2019-01-23 Thread GitBox
tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid 
concurrent usage of StateSnapshotTransformer
URL: https://github.com/apache/flink/pull/7320#discussion_r250212255
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSnapshotTransformFactoryAdaptor.java
 ##
 @@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import 
org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTransformFactory;
+
+import java.util.Optional;
+
+abstract class RocksDBSnapshotTransformFactoryAdaptor implements 
StateSnapshotTransformFactory {
+   final StateSnapshotTransformFactory snapshotTransformFactory;
+
+   
RocksDBSnapshotTransformFactoryAdaptor(StateSnapshotTransformFactory 
snapshotTransformFactory) {
+   this.snapshotTransformFactory = snapshotTransformFactory;
+   }
+
+   @Override
+   public Optional> 
createForDeserializedState() {
+   throw new UnsupportedOperationException("Only serialized state 
filtering is supported in RocksDB backend");
+   }
+
+   @SuppressWarnings("unchecked")
+   static  StateSnapshotTransformFactory 
wrapStateSnapshotTransformFactory(
+   StateDescriptor stateDesc,
+   StateSnapshotTransformFactory snapshotTransformFactory) {
+   if (stateDesc instanceof ListStateDescriptor) {
+   TypeSerializer elementSerializer =  
((ListStateDescriptor) stateDesc).getElementSerializer();
 
 Review comment:
   I would like to hear @tzulitai opinion on the way we obtain the element 
serializer here. If I understood it correctly, then we should obtain it from 
the meta state information because it might be reconfigured.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer

2019-01-23 Thread GitBox
tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid 
concurrent usage of StateSnapshotTransformer
URL: https://github.com/apache/flink/pull/7320#discussion_r250210160
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotTransformerTest.java
 ##
 @@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import 
org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTransformFactory;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.runtime.state.internal.InternalMapState;
+import org.apache.flink.runtime.state.internal.InternalValueState;
+import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
+import org.apache.flink.util.StringUtils;
+
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.Random;
+import java.util.concurrent.RunnableFuture;
+
+import static org.junit.Assert.assertEquals;
+
+class StateSnapshotTransformerTest {
+   private final AbstractKeyedStateBackend backend;
+   private final BlockerCheckpointStreamFactory streamFactory;
+   private final StateSnapshotTransformFactory snapshotTransformFactory;
+
+   StateSnapshotTransformerTest(
+   AbstractKeyedStateBackend backend,
+   BlockerCheckpointStreamFactory streamFactory) {
+
+   this.backend = backend;
+   this.streamFactory = streamFactory;
+   this.snapshotTransformFactory = 
SingleThreadAccessCheckingsnapshotTransformFactory.create();
+   }
+
+   void testNonConcurrentSnapshotTransformerAccess() throws Exception {
+   List testStates = Arrays.asList(
+   new TestValueState(),
+   new TestListState(),
+   new TestMapState()
+   );
+
+   for (TestState state : testStates) {
+   for (int i = 0; i < 100; i++) {
+   backend.setCurrentKey(i);
+   state.setToRandomValue();
+   }
+
+   CheckpointOptions checkpointOptions = 
CheckpointOptions.forCheckpointWithDefaultLocation();
+
+   RunnableFuture> 
snapshot1 =
+   backend.snapshot(1L, 0L, streamFactory, 
checkpointOptions);
+
+   RunnableFuture> 
snapshot2 =
+   backend.snapshot(2L, 0L, streamFactory, 
checkpointOptions);
+
+   Thread runner1 = new Thread(snapshot1, "snapshot1");
+   runner1.start();
+   Thread runner2 = new Thread(snapshot2, "snapshot2");
+   runner2.start();
+
+   runner1.join();
+   runner2.join();
+
+   snapshot1.get();
+   snapshot2.get();
+   }
+   }
+
+   private abstract class TestState {
+   final Random rnd;
+
+   private TestState() {
+   this.rnd = new Random();
+   }
+
+   abstract void setToRandomValue() throws Exception;
+
+   String getRandomString() {
+   return StringUtils.getRandomString(rnd, 5, 10);
+   }
+   }
+
+   private class TestValueState extends TestState {
+   private final InternalValueState state;
+
+   private TestValueState() throws Exception {
+   this.state = backend.createInternalState(
+   VoidNamespaceSerializer.INSTANCE,
+   new ValueStateDescriptor<>("TestValueState", 
StringSerializer.INSTANCE),
+

[GitHub] asfgit closed pull request #7569: [hotfix] [javadocs] fix typo in JobManagerRunner

2019-01-23 Thread GitBox
asfgit closed pull request #7569: [hotfix] [javadocs] fix typo in 
JobManagerRunner
URL: https://github.com/apache/flink/pull/7569
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11046) ElasticSearch6Connector cause thread blocked when index failed with retry

2019-01-23 Thread xueyu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16750025#comment-16750025
 ] 

xueyu commented on FLINK-11046:
---

Hi, [~dawidwys][~tzulitai], I don't know how to make elastic6 response has 
failure and then trigger onFailure callback in e2e test 
Elasticsearch6SinkExample.java right now...Do you have any suggestions about 
this..? Thanks. 
I tried this and was not sure whether it is right..

{code:java}
private static IndexRequest createIndexRequest(String element, 
ParameterTool parameterTool) {
Map json = new HashMap<>();
json.put("data", element);

if (element.startsWith("message #15")) {
return Requests.indexRequest()
.index(null)
.type(null)
.id(null)
.source(element);
} else {
return Requests.indexRequest()
.index(parameterTool.getRequired("index"))
.type(parameterTool.getRequired("type"))
.id(element)
.source(json);
}
}
{code}


> ElasticSearch6Connector cause thread blocked when index failed with retry
> -
>
> Key: FLINK-11046
> URL: https://issues.apache.org/jira/browse/FLINK-11046
> Project: Flink
>  Issue Type: Bug
>  Components: ElasticSearch Connector
>Affects Versions: 1.6.2
>Reporter: luoguohao
>Assignee: xueyu
>Priority: Major
>
> When i'm using es6 sink to index into es, bulk process with some exception 
> catched, and  i trying to reindex the document with the call 
> `indexer.add(action)` in the `ActionRequestFailureHandler.onFailure()` 
> method, but things goes incorrect. The call thread stuck there, and with the 
> thread dump, i saw the `bulkprocessor` object was locked by other thread. 
> {code:java}
> public interface ActionRequestFailureHandler extends Serializable {
>  void onFailure(ActionRequest action, Throwable failure, int restStatusCode, 
> RequestIndexer indexer) throws Throwable;
> }
> {code}
> After i read the code implemented in the `indexer.add(action)`, i find that 
> `synchronized` is needed on each add operation.
> {code:java}
> private synchronized void internalAdd(DocWriteRequest request, @Nullable 
> Object payload) {
>   ensureOpen();
>   bulkRequest.add(request, payload);
>   executeIfNeeded();
> }
> {code}
> And, at i also noticed that `bulkprocessor` object would also locked in the 
> bulk process thread. 
> the bulk process operation is in the following code:
> {code:java}
> public void execute(BulkRequest bulkRequest, long executionId) {
> Runnable toRelease = () -> {};
> boolean bulkRequestSetupSuccessful = false;
> try {
> listener.beforeBulk(executionId, bulkRequest);
> semaphore.acquire();
> toRelease = semaphore::release;
> CountDownLatch latch = new CountDownLatch(1);
> retry.withBackoff(consumer, bulkRequest, new 
> ActionListener() {
> @Override
> public void onResponse(BulkResponse response) {
> try {
> listener.afterBulk(executionId, bulkRequest, response);
> } finally {
> semaphore.release();
> latch.countDown();
> }
> }
> @Override
> public void onFailure(Exception e) {
> try {
> listener.afterBulk(executionId, bulkRequest, e);
> } finally {
> semaphore.release();
> latch.countDown();
> }
> }
> }, Settings.EMPTY);
> bulkRequestSetupSuccessful = true;
>if (concurrentRequests == 0) {
>latch.await();
> }
> } catch (InterruptedException e) {
> Thread.currentThread().interrupt();
> logger.info(() -> new ParameterizedMessage("Bulk request {} has been 
> cancelled.", executionId), e);
> listener.afterBulk(executionId, bulkRequest, e);
> } catch (Exception e) {
> logger.warn(() -> new ParameterizedMessage("Failed to execute bulk 
> request {}.", executionId), e);
> listener.afterBulk(executionId, bulkRequest, e);
> } finally {
> if (bulkRequestSetupSuccessful == false) {  // if we fail on 
> client.bulk() release the semaphore
> toRelease.run();
> }
> }
> }
> {code}
> As the read line i marked above, i think, that's the reason why the retry 
> operation thread was block, because the the bulk 

[jira] [Resolved] (FLINK-11348) Port YARNSessionCapacitySchedulerITCase#testClientStartup to new code base

2019-01-23 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11348?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-11348.
---
   Resolution: Fixed
Fix Version/s: 1.8.0

Fixed via 
https://github.com/apache/flink/commit/3f0b1dfc4da548b166bc3e6aaf4d694c52ddda7b

> Port YARNSessionCapacitySchedulerITCase#testClientStartup to new code base
> --
>
> Key: FLINK-11348
> URL: https://issues.apache.org/jira/browse/FLINK-11348
> Project: Flink
>  Issue Type: Sub-task
>Reporter: vinoyang
>Assignee: Gary Yao
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Port {{YARNSessionCapacitySchedulerITCase#testClientStartup}} 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11409) Make `ProcessFunction`, `ProcessWindowFunction` and etc. pure interfaces

2019-01-23 Thread Kezhu Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11409?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kezhu Wang updated FLINK-11409:
---
Description: 
I found these functions express no opinionated demands from implementing 
classes. It would be nice to implement as interfaces not abstract classes as 
abstract class is intrusive and hampers caller user cases. For example, client 
can't write an `AbstractFlinkRichFunction` to unify lifecycle management for 
all data processing functions in easy way.

I dive history of some of these functions, and find that some functions were 
converted as abstract class from interface due to default method 
implementation, such as `ProcessFunction` and `CoProcessFunction` were 
converted to abstract classes in FLINK-4460 which predate -FLINK-72+42+-. After 
-FLINK-72+42+-, [Java 8 default 
method|https://docs.oracle.com/javase/tutorial/java/IandI/defaultmethods.html]
 would be a better solution.

I notice also that some functions which are introduced after -FLINK-72+42+-, 
such as `ProcessJoinFunction`, are implemented as abstract classes. I think it 
would be better to establish a well-known principle to guide both api authors 
and callers of data processing functions.

Personally, I prefer interface for all exported function callbacks for the 
reason I express in first paragraph.

Besides this, with `AbstractRichFunction` and interfaces for data processing 
functions I think lots of rich data processing functions can be eliminated as 
they are plain classes extending `AbstractRichFunction` and implementing data 
processing interfaces, clients can write this in one line code with clear 
intention of both data processing and lifecycle management.

Following is a possible incomplete list of data processing functions 
implemented as abstract classes currently:
 * `ProcessFunction`, `KeyedProcessFunction`, `CoProcessFunction` and 
`ProcessJoinFunction`
 * `ProcessWindowFunction` and `ProcessAllWindowFunction`
 * `BaseBroadcastProcessFunction`, `BroadcastProcessFunction` and 
`KeyedBroadcastProcessFunction`

All above functions are annotated with `@PublicEvolving`, making they 
interfaces won't break Flink's compatibility guarantee but compatibility is 
still a big consideration to evaluate this proposal.

Any thoughts on this proposal ? Please must comment out.

  was:
I found these functions express no opinionated demands from implementing 
classes. It would be nice to implement as interfaces not abstract classes as 
abstract class is intrusive and hampers caller user cases. For example, client 
can't write an `AbstractFlinkRichFunction` to unify lifecycle management for 
all data processing functions in easy way.

I dive history of some of these functions, and find that some functions were 
converted as abstract class from interface due to default method 
implementation, such as `ProcessFunction` and `CoProcessFunction` were 
converted to abstract classes in FLINK-4460 which predate FLINK-7274. After 
FLINK-7274, [Java 8 default 
method|https://docs.oracle.com/javase/tutorial/java/IandI/defaultmethods.html]
 would be a better solution.

I notice also that some functions which are introduced after FLINK-7274, such 
as `ProcessJoinFunction`, are implemented as abstract classes. I think it would 
be better to establish a well-known principle to guide both api authors and 
callers of data processing functions.

Personally, I prefer interface for all exported function callbacks for the 
reason I express in first paragraph.

Besides this, with `AbstractRichFunction` and interfaces for data processing 
functions I think lots of rich data processing functions can be eliminated as 
they are plain classes extending `AbstractRichFunction` and implementing data 
processing interfaces, clients can write this in one line code with clear 
intention of both data processing and lifecycle management.

Following is a possible incomplete list of data processing functions 
implemented as abstract classes currently:
 * `ProcessFunction`, `KeyedProcessFunction`, `CoProcessFunction` and 
`ProcessJoinFunction`
 * `ProcessWindowFunction` and `ProcessAllWindowFunction`
 * `BaseBroadcastProcessFunction`, `BroadcastProcessFunction` and 
`KeyedBroadcastProcessFunction`

All above functions are annotated with `@PublicEvolving`, making they 
interfaces won't break Flink's compatibility guarantee but compatibility is 
still a big consideration to evaluate this proposal.

Any thoughts on this proposal ? Please must comment out.


> Make `ProcessFunction`, `ProcessWindowFunction` and etc. pure interfaces
> 
>
> Key: FLINK-11409
> URL: https://issues.apache.org/jira/browse/FLINK-11409
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Kezhu Wang
>Priority: Major

[GitHub] tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer

2019-01-23 Thread GitBox
tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid 
concurrent usage of StateSnapshotTransformer
URL: https://github.com/apache/flink/pull/7320#discussion_r250206990
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotTransformers.java
 ##
 @@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import 
org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTransformFactory;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+import static 
org.apache.flink.runtime.state.StateSnapshotTransformer.CollectionStateSnapshotTransformer.TransformStrategy.STOP_ON_FIRST_INCLUDED;
+
+/** Collection of common state snapshot transformers and their factories. */
+public class StateSnapshotTransformers {
 
 Review comment:
   Alright, makes sense. Thanks for the clarification.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] rmetzger commented on issue #7569: [hotfix] [javadocs] fix typo in JobManagerRunner

2019-01-23 Thread GitBox
rmetzger commented on issue #7569: [hotfix] [javadocs] fix typo in 
JobManagerRunner
URL: https://github.com/apache/flink/pull/7569#issuecomment-456811678
 
 
   Thank you. Will merge.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-11409) Make `ProcessFunction`, `ProcessWindowFunction` and etc. pure interfaces

2019-01-23 Thread Kezhu Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11409?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kezhu Wang updated FLINK-11409:
---
Description: 
I found these functions express no opinionated demands from implementing 
classes. It would be nice to implement as interfaces not abstract classes as 
abstract class is intrusive and hampers caller user cases. For example, client 
can't write an `AbstractFlinkRichFunction` to unify lifecycle management for 
all data processing functions in easy way.

I dive history of some of these functions, and find that some functions were 
converted as abstract class from interface due to default method 
implementation, such as `ProcessFunction` and `CoProcessFunction` were 
converted to abstract classes in FLINK-4460 which predate -FLINK-7242-. After 
-FLINK-7242-, [Java 8 default 
method|https://docs.oracle.com/javase/tutorial/java/IandI/defaultmethods.html]
 would be a better solution.

I notice also that some functions which are introduced after -FLINK-7242-, such 
as `ProcessJoinFunction`, are implemented as abstract classes. I think it would 
be better to establish a well-known principle to guide both api authors and 
callers of data processing functions.

Personally, I prefer interface for all exported function callbacks for the 
reason I express in first paragraph.

Besides this, with `AbstractRichFunction` and interfaces for data processing 
functions I think lots of rich data processing functions can be eliminated as 
they are plain classes extending `AbstractRichFunction` and implementing data 
processing interfaces, clients can write this in one line code with clear 
intention of both data processing and lifecycle management.

Following is a possible incomplete list of data processing functions 
implemented as abstract classes currently:
 * `ProcessFunction`, `KeyedProcessFunction`, `CoProcessFunction` and 
`ProcessJoinFunction`
 * `ProcessWindowFunction` and `ProcessAllWindowFunction`
 * `BaseBroadcastProcessFunction`, `BroadcastProcessFunction` and 
`KeyedBroadcastProcessFunction`

All above functions are annotated with `@PublicEvolving`, making they 
interfaces won't break Flink's compatibility guarantee but compatibility is 
still a big consideration to evaluate this proposal.

Any thoughts on this proposal ? Please must comment out.

  was:
I found these functions express no opinionated demands from implementing 
classes. It would be nice to implement as interfaces not abstract classes as 
abstract class is intrusive and hampers caller user cases. For example, client 
can't write an `AbstractFlinkRichFunction` to unify lifecycle management for 
all data processing functions in easy way.

I dive history of some of these functions, and find that some functions were 
converted as abstract class from interface due to default method 
implementation, such as `ProcessFunction` and `CoProcessFunction` were 
converted to abstract classes in FLINK-4460 which predate -FLINK-72+42+-. After 
-FLINK-72+42+-, [Java 8 default 
method|https://docs.oracle.com/javase/tutorial/java/IandI/defaultmethods.html]
 would be a better solution.

I notice also that some functions which are introduced after -FLINK-72+42+-, 
such as `ProcessJoinFunction`, are implemented as abstract classes. I think it 
would be better to establish a well-known principle to guide both api authors 
and callers of data processing functions.

Personally, I prefer interface for all exported function callbacks for the 
reason I express in first paragraph.

Besides this, with `AbstractRichFunction` and interfaces for data processing 
functions I think lots of rich data processing functions can be eliminated as 
they are plain classes extending `AbstractRichFunction` and implementing data 
processing interfaces, clients can write this in one line code with clear 
intention of both data processing and lifecycle management.

Following is a possible incomplete list of data processing functions 
implemented as abstract classes currently:
 * `ProcessFunction`, `KeyedProcessFunction`, `CoProcessFunction` and 
`ProcessJoinFunction`
 * `ProcessWindowFunction` and `ProcessAllWindowFunction`
 * `BaseBroadcastProcessFunction`, `BroadcastProcessFunction` and 
`KeyedBroadcastProcessFunction`

All above functions are annotated with `@PublicEvolving`, making they 
interfaces won't break Flink's compatibility guarantee but compatibility is 
still a big consideration to evaluate this proposal.

Any thoughts on this proposal ? Please must comment out.


> Make `ProcessFunction`, `ProcessWindowFunction` and etc. pure interfaces
> 
>
> Key: FLINK-11409
> URL: https://issues.apache.org/jira/browse/FLINK-11409
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Kezhu Wang
>Priority: 

[GitHub] tillrohrmann merged pull request #7538: [FLINK-11348][tests] Port testClientStartup to new codebase

2019-01-23 Thread GitBox
tillrohrmann merged pull request #7538: [FLINK-11348][tests] Port 
testClientStartup to new codebase
URL: https://github.com/apache/flink/pull/7538
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on issue #7526: [FLINK-11353][tests] Port JobManagerHAJobGraphRecoveryITCase to new code base

2019-01-23 Thread GitBox
tillrohrmann commented on issue #7526: [FLINK-11353][tests] Port 
JobManagerHAJobGraphRecoveryITCase to new code base
URL: https://github.com/apache/flink/pull/7526#issuecomment-456809149
 
 
   Thanks for the review @azagrebin. I've addressed your comments and moved 
`testPersistedJobGraphWhenDispatcherIsShutDown` to `DispatcherTest` since it is 
not HA related.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7526: [FLINK-11353][tests] Port JobManagerHAJobGraphRecoveryITCase to new code base

2019-01-23 Thread GitBox
tillrohrmann commented on a change in pull request #7526: [FLINK-11353][tests] 
Port JobManagerHAJobGraphRecoveryITCase to new code base
URL: https://github.com/apache/flink/pull/7526#discussion_r250202116
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
 ##
 @@ -225,13 +243,78 @@ public static JobGraph createNonEmptyJobGraph() {
return jobGraph;
}
 
-   private static class HATestingDispatcher extends TestingDispatcher {
+   @Test
+   public void testPersistedJobGraphWhenDispatcherIsShutDown() throws 
Exception {
+   final JobGraph jobGraph = createNonEmptyJobGraph();
+   final TestingHighAvailabilityServices highAvailabilityServices 
= new TestingHighAvailabilityServices();
 
-   @Nonnull
-   private final BlockingQueue fencingTokens;
+   final InMemorySubmittedJobGraphStore submittedJobGraphStore = 
new InMemorySubmittedJobGraphStore();
+   
highAvailabilityServices.setSubmittedJobGraphStore(submittedJobGraphStore);
+
+   final TestingLeaderElectionService leaderElectionService = new 
TestingLeaderElectionService();
+   
highAvailabilityServices.setDispatcherLeaderElectionService(leaderElectionService);
+
+   
highAvailabilityServices.setJobMasterLeaderElectionService(jobGraph.getJobID(), 
new TestingLeaderElectionService());
+   highAvailabilityServices.setResourceManagerLeaderRetriever(new 
SettableLeaderRetrievalService());
+   highAvailabilityServices.setCheckpointRecoveryFactory(new 
StandaloneCheckpointRecoveryFactory());
 
-   HATestingDispatcher(RpcService rpcService, String endpointId, 
Configuration configuration, HighAvailabilityServices highAvailabilityServices, 
ResourceManagerGateway resourceManagerGateway, BlobServer blobServer, 
HeartbeatServices heartbeatServices, JobManagerMetricGroup 
jobManagerMetricGroup, @Nullable String metricQueryServicePath, 
ArchivedExecutionGraphStore archivedExecutionGraphStore, 
JobManagerRunnerFactory jobManagerRunnerFactory, FatalErrorHandler 
fatalErrorHandler, @Nonnull BlockingQueue fencingTokens) throws 
Exception {
-   super(rpcService, endpointId, configuration, 
highAvailabilityServices, resourceManagerGateway, blobServer, 
heartbeatServices, jobManagerMetricGroup, metricQueryServicePath, 
archivedExecutionGraphStore, jobManagerRunnerFactory, fatalErrorHandler);
+   final HATestingDispatcher dispatcher = createDispatcher(
 
 Review comment:
   I think you're right and the test should be moved to `DispatcherTest` 
because it is not HA specific. Moreover, we should move `getNumberJobs` into 
`TestingDispatcher`.
   
   The annotation `VisibleForTesting` is not necessary since 
`HATestingDispatcher` is a testing class. Will remove it while moving the 
method over to `TestingDispatcher`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7526: [FLINK-11353][tests] Port JobManagerHAJobGraphRecoveryITCase to new code base

2019-01-23 Thread GitBox
tillrohrmann commented on a change in pull request #7526: [FLINK-11353][tests] 
Port JobManagerHAJobGraphRecoveryITCase to new code base
URL: https://github.com/apache/flink/pull/7526#discussion_r250198790
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
 ##
 @@ -225,13 +243,78 @@ public static JobGraph createNonEmptyJobGraph() {
return jobGraph;
}
 
-   private static class HATestingDispatcher extends TestingDispatcher {
+   @Test
+   public void testPersistedJobGraphWhenDispatcherIsShutDown() throws 
Exception {
+   final JobGraph jobGraph = createNonEmptyJobGraph();
+   final TestingHighAvailabilityServices highAvailabilityServices 
= new TestingHighAvailabilityServices();
 
-   @Nonnull
-   private final BlockingQueue fencingTokens;
+   final InMemorySubmittedJobGraphStore submittedJobGraphStore = 
new InMemorySubmittedJobGraphStore();
+   
highAvailabilityServices.setSubmittedJobGraphStore(submittedJobGraphStore);
+
+   final TestingLeaderElectionService leaderElectionService = new 
TestingLeaderElectionService();
+   
highAvailabilityServices.setDispatcherLeaderElectionService(leaderElectionService);
+
+   
highAvailabilityServices.setJobMasterLeaderElectionService(jobGraph.getJobID(), 
new TestingLeaderElectionService());
+   highAvailabilityServices.setResourceManagerLeaderRetriever(new 
SettableLeaderRetrievalService());
+   highAvailabilityServices.setCheckpointRecoveryFactory(new 
StandaloneCheckpointRecoveryFactory());
 
-   HATestingDispatcher(RpcService rpcService, String endpointId, 
Configuration configuration, HighAvailabilityServices highAvailabilityServices, 
ResourceManagerGateway resourceManagerGateway, BlobServer blobServer, 
HeartbeatServices heartbeatServices, JobManagerMetricGroup 
jobManagerMetricGroup, @Nullable String metricQueryServicePath, 
ArchivedExecutionGraphStore archivedExecutionGraphStore, 
JobManagerRunnerFactory jobManagerRunnerFactory, FatalErrorHandler 
fatalErrorHandler, @Nonnull BlockingQueue fencingTokens) throws 
Exception {
-   super(rpcService, endpointId, configuration, 
highAvailabilityServices, resourceManagerGateway, blobServer, 
heartbeatServices, jobManagerMetricGroup, metricQueryServicePath, 
archivedExecutionGraphStore, jobManagerRunnerFactory, fatalErrorHandler);
+   final HATestingDispatcher dispatcher = createDispatcher(
+   highAvailabilityServices,
+   Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE);
+
+   dispatcher.start();
+
+   try {
+   // grant leadership and submit a single job
+   final DispatcherId expectedDispatcherId = 
DispatcherId.generate();
+   
leaderElectionService.isLeader(expectedDispatcherId.toUUID()).get();
+
+   final DispatcherGateway dispatcherGateway = 
dispatcher.getSelfGateway(DispatcherGateway.class);
+   final CompletableFuture submissionFuture = 
dispatcherGateway.submitJob(jobGraph, timeout);
+   submissionFuture.get();
+   assertThat(dispatcher.getNumberJobs(timeout).get(), 
is(1));
+
+   dispatcher.shutDown();
+   dispatcher.getTerminationFuture().get();
+
+   
assertThat(submittedJobGraphStore.contains(jobGraph.getJobID()), is(true));
 
 Review comment:
   No it won't. But in the non-HA mode, we usually instantiate a 
`StandaloneSubmittedJobGraphStore` which is a no-op class.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7526: [FLINK-11353][tests] Port JobManagerHAJobGraphRecoveryITCase to new code base

2019-01-23 Thread GitBox
tillrohrmann commented on a change in pull request #7526: [FLINK-11353][tests] 
Port JobManagerHAJobGraphRecoveryITCase to new code base
URL: https://github.com/apache/flink/pull/7526#discussion_r250197888
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
 ##
 @@ -150,8 +154,22 @@ public void testGrantingRevokingLeadership() throws 
Exception {
}
 
@Nonnull
-   private HATestingDispatcher 
createHADispatcher(TestingHighAvailabilityServices highAvailabilityServices, 
BlockingQueue fencingTokens) throws Exception {
+   private HATestingDispatcher 
createDispatcher(TestingHighAvailabilityServices highAvailabilityServices, 
Queue fencingTokens) throws Exception {
 
 Review comment:
   Good point. Will do it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on issue #7541: [FLINK-11356][tests] Port JobManagerStartupTest to new code base

2019-01-23 Thread GitBox
tillrohrmann commented on issue #7541: [FLINK-11356][tests] Port 
JobManagerStartupTest to new code base
URL: https://github.com/apache/flink/pull/7541#issuecomment-456803334
 
 
   Thanks for the review @GJL. I've addressed your comments.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7541: [FLINK-11356][tests] Port JobManagerStartupTest to new code base

2019-01-23 Thread GitBox
tillrohrmann commented on a change in pull request #7541: [FLINK-11356][tests] 
Port JobManagerStartupTest to new code base
URL: https://github.com/apache/flink/pull/7541#discussion_r250195669
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerTest.java
 ##
 @@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.IOException;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeTrue;
+
+/**
+ * Tests for the {@link BlobServer}.
+ */
+public class BlobServerTest extends TestLogger {
+
+   @ClassRule
+   public static TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   /**
+* Tests that the {@link BlobServer} fails if the blob storage directory
+* cannot be created.
+*/
+   @Test
+   public void testFailureIfStorageDirectoryCannotBeCreated() throws 
IOException {
+   assumeTrue(!OperatingSystem.isWindows()); //setWritable doesn't 
work on Windows.
 
 Review comment:
   Good point Gary. I will move the assumption into the 
`createNonWritableDirectory` method and change it to `assumeFalse`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-11355) Check and port JobManagerProcessReapingTest if necessary to new code base

2019-01-23 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11355?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-11355.
-
   Resolution: Fixed
Fix Version/s: 1.8.0

Fixed via 
https://github.com/apache/flink/commit/1c4df9954e2ba103f09c4160ee918b625c8dc10d

> Check and port JobManagerProcessReapingTest if necessary to new code base
> -
>
> Key: FLINK-11355
> URL: https://issues.apache.org/jira/browse/FLINK-11355
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Check and port {{JobManagerProcessReapingTest}} if necessary to new code base.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tillrohrmann commented on a change in pull request #7541: [FLINK-11356][tests] Port JobManagerStartupTest to new code base

2019-01-23 Thread GitBox
tillrohrmann commented on a change in pull request #7541: [FLINK-11356][tests] 
Port JobManagerStartupTest to new code base
URL: https://github.com/apache/flink/pull/7541#discussion_r250194572
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
 ##
 @@ -360,4 +368,32 @@ public void testConcurrentActorSystemCreation() throws 
Exception {
ExecutorUtils.gracefulShutdown(1L, 
TimeUnit.MILLISECONDS, executorService);
}
}
+
+   /**
+* Tests that the {@link ActorSystem} fails with an expressive 
exception if it cannot be
+* instantiated due to an occupied port.
+*/
+   @Test
+   public void testActorSystemInstantiationFailureWhenPortOccupied() 
throws Exception {
+   ServerSocket portOccupier;
+   final int port;
+
+   try {
+   port = NetUtils.getAvailablePort();
+   portOccupier = new ServerSocket(port, 10, 
InetAddress.getByName("0.0.0.0"));
+   }
+   catch (Throwable t) {
+   // could not find free port, or open a connection there
 
 Review comment:
   I think you're right that this not necessary by not preselecting the `port`. 
Instead we should let the `ServerSocket` pick a port via `ServerSocket(0,...)`. 
Will change it accordingly.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann merged pull request #7540: [FLINK-11355][tests] Remove JobManagerProcessReapingTest

2019-01-23 Thread GitBox
tillrohrmann merged pull request #7540: [FLINK-11355][tests] Remove 
JobManagerProcessReapingTest
URL: https://github.com/apache/flink/pull/7540
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on issue #7540: [FLINK-11355][tests] Remove JobManagerProcessReapingTest

2019-01-23 Thread GitBox
tillrohrmann commented on issue #7540: [FLINK-11355][tests] Remove 
JobManagerProcessReapingTest
URL: https://github.com/apache/flink/pull/7540#issuecomment-456800082
 
 
   Thanks for your review @azagrebin. Merging this PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann closed pull request #7524: [FLINK-11351][tests] Port JobManagerCleanupITCase to new code base

2019-01-23 Thread GitBox
tillrohrmann closed pull request #7524:  [FLINK-11351][tests] Port 
JobManagerCleanupITCase to new code base
URL: https://github.com/apache/flink/pull/7524
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-11351) Port JobManagerCleanupITCase to new code base

2019-01-23 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11351?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-11351.
-
   Resolution: Fixed
Fix Version/s: 1.8.0

Fixed via 960feb4437c439084cfb317dd127645cfd176578

> Port JobManagerCleanupITCase to new code base
> -
>
> Key: FLINK-11351
> URL: https://issues.apache.org/jira/browse/FLINK-11351
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Port {{JobManagerCleanupITCase}} to new code base.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tillrohrmann commented on issue #7524: [FLINK-11351][tests] Port JobManagerCleanupITCase to new code base

2019-01-23 Thread GitBox
tillrohrmann commented on issue #7524:  [FLINK-11351][tests] Port 
JobManagerCleanupITCase to new code base
URL: https://github.com/apache/flink/pull/7524#issuecomment-456799536
 
 
   Thanks for the review @GJL. Merging this PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


  1   2   >