回复:Memory size for network buffers

2018-11-12 Thread zhijiang
Hi Thomas,

The current calculation for network buffer size is 
"Math.min(taskmanager.network.memory.max, 
Math.max(taskmanager.network.memory.min, fraction * totalMem))".
Based on your below configuration, the result is just 32768 bytes (8 buffers) 
from taskmanager.network.memory.min.
If you want to config the fixed network buffers, you can set the same values 
for min and max parameters, ignore the fraction value in the configuration.
BTW, you can set the unit for these paramenters, such as min : 32kb.

As for the thoughts of min and max setting,  it is difficult to exactly know 
how many network buffers are needed in TaskManager startup and which kinds of 
tasks would be deployed to run in this TaskManager later.  For example, the 
batch jobs can make use of as many network buffers as system can spare. But for 
stream jobs, the spare buffers can be used for other places for possible 
improvements. In order to keep the possibility of future improments and not 
change the configuration setting, we retain these current parameters.

For you job I think you should increase the min value for more network buffers, 
the current 4 buffers are indeed not enough for common jobs.

Best,
Zhijiang
--
发件人:Thomas Weise 
发送时间:2018年11月13日(星期二) 13:11
收件人:dev 
主 题:Memory size for network buffers

Hi,

I'm trying to understand the intention behind the size parameters for
network buffers, specifically max, min and fraction. The reason we are
looking at it is an attempt to make the memory allocation elastic, so that
memory is allocated according to the actual number of buffers required
(within a range), without the need to tune this for every deployment.

As of Flink 1.5, there are 3 parameters, but they all result in a fixed
allocation, which is not what we were looking for.

Here is an example just to illustrate it:

taskmanager.network.memory.fraction: 0.01
taskmanager.network.memory.min: 32768
taskmanager.network.memory.max: 1073741824
taskmanager.memory.segment-size: 8192

I wanted fraction to be out of the picture (but 0 isn't an acceptable
value).

Then set min to something tiny that my job will exceed and max to something
too large to reach. Unfortunately, that fails:

java.io.IOException: Insufficient number of network buffers: required 8,
but only 0 available. The total number of network buffers is currently set
to 4 of 8192 bytes each. You can increase this number by setting the
configuration keys 'taskmanager.network.memory.fraction',
'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'.

So the question then is, why have min and max? Or is the intention to have
a different implementation in the future?

Thanks,
Thomas



[jira] [Created] (FLINK-10857) Conflict between JMX and Prometheus Metrics reporter

2018-11-12 Thread Truong Duc Kien (JIRA)
Truong Duc Kien created FLINK-10857:
---

 Summary: Conflict between JMX and Prometheus Metrics reporter
 Key: FLINK-10857
 URL: https://issues.apache.org/jira/browse/FLINK-10857
 Project: Flink
  Issue Type: Bug
  Components: Metrics
Affects Versions: 1.6.2
Reporter: Truong Duc Kien


When registering both JMX and Prometheus metrics reporter, the Prometheus 
reporter will fail with an exception.

 
{code:java}
o.a.f.r.m.MetricRegistryImpl Error while registering metric.
java.lang.IllegalArgumentException: Invalid metric name: 
flink_jobmanager.Status.JVM.Memory.Mapped_Count
at 
org.apache.flink.shaded.io.prometheus.client.Collector.checkMetricName(Collector.java:182)
at 
org.apache.flink.shaded.io.prometheus.client.SimpleCollector.(SimpleCollector.java:164)
at 
org.apache.flink.shaded.io.prometheus.client.Gauge.(Gauge.java:68)
at 
org.apache.flink.shaded.io.prometheus.client.Gauge$Builder.create(Gauge.java:74)
at 
org.apache.flink.metrics.prometheus.AbstractPrometheusReporter.createCollector(AbstractPrometheusReporter.java:130)
at 
org.apache.flink.metrics.prometheus.AbstractPrometheusReporter.notifyOfAddedMetric(AbstractPrometheusReporter.java:106)
at 
org.apache.flink.runtime.metrics.MetricRegistryImpl.register(MetricRegistryImpl.java:329)
at 
org.apache.flink.runtime.metrics.groups.AbstractMetricGroup.addMetric(AbstractMetricGroup.java:379)
at 
org.apache.flink.runtime.metrics.groups.AbstractMetricGroup.gauge(AbstractMetricGroup.java:323)
at 
org.apache.flink.runtime.metrics.util.MetricUtils.instantiateMemoryMetrics(MetricUtils.java:231)
at 
org.apache.flink.runtime.metrics.util.MetricUtils.instantiateStatusMetrics(MetricUtils.java:100)
at 
org.apache.flink.runtime.metrics.util.MetricUtils.instantiateJobManagerMetricGroup(MetricUtils.java:68)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startClusterComponents(ClusterEntrypoint.java:342)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:233)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:191)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1869)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:190)
at 
org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:176)
{code}
 

This is a small program to reproduce the problem:

https://github.com/dikei/flink-metrics-conflict-test

 

I



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem

2018-11-12 Thread Zhang, Xuefu
Hi Piotr

I have extracted the API portion of  the design and the google doc is here. 
Please review and provide your feedback.

Thanks,
Xuefu


--
Sender:Xuefu 
Sent at:2018 Nov 12 (Mon) 12:43
Recipient:Piotr Nowojski ; dev 
Cc:Bowen Li ; Shuyi Chen 
Subject:Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem

Hi Piotr,

That sounds good to me. Let's close all the open questions ((there are a couple 
of them)) in the Google doc and I should be able to quickly split it into the 
three proposals as you suggested.

Thanks,
Xuefu


--
Sender:Piotr Nowojski 
Sent at:2018 Nov 9 (Fri) 22:46
Recipient:dev ; Xuefu 
Cc:Bowen Li ; Shuyi Chen 
Subject:Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem

Hi,

Yes, it seems like the best solution. Maybe someone else can also suggests if 
we can split it further? Maybe changes in the interface in one doc, reading 
from hive meta store another and final storing our meta informations in hive 
meta store?

Piotrek

> On 9 Nov 2018, at 01:44, Zhang, Xuefu  wrote:
> 
> Hi Piotr,
> 
> That seems to be good idea!
> 
> Since the google doc for the design is currently under extensive review, I 
> will leave it as it is for now. However, I'll convert it to two different 
> FLIPs when the time comes.
> 
> How does it sound to you?
> 
> Thanks,
> Xuefu
> 
> 
> --
> Sender:Piotr Nowojski 
> Sent at:2018 Nov 9 (Fri) 02:31
> Recipient:dev 
> Cc:Bowen Li ; Xuefu ; Shuyi 
> Chen 
> Subject:Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem
> 
> Hi,
> 
> Maybe we should split this topic (and the design doc) into couple of smaller 
> ones, hopefully independent. The questions that you have asked Fabian have 
> for example very little to do with reading metadata from Hive Meta Store?
> 
> Piotrek 
> 
>> On 7 Nov 2018, at 14:27, Fabian Hueske  wrote:
>> 
>> Hi Xuefu and all,
>> 
>> Thanks for sharing this design document!
>> I'm very much in favor of restructuring / reworking the catalog handling in
>> Flink SQL as outlined in the document.
>> Most changes described in the design document seem to be rather general and
>> not specifically related to the Hive integration.
>> 
>> IMO, there are some aspects, especially those at the boundary of Hive and
>> Flink, that need a bit more discussion. For example
>> 
>> * What does it take to make Flink schema compatible with Hive schema?
>> * How will Flink tables (descriptors) be stored in HMS?
>> * How do both Hive catalogs differ? Could they be integrated into to a
>> single one? When to use which one?
>> * What meta information is provided by HMS? What of this can be leveraged
>> by Flink?
>> 
>> Thank you,
>> Fabian
>> 
>> Am Fr., 2. Nov. 2018 um 00:31 Uhr schrieb Bowen Li :
>> 
>>> After taking a look at how other discussion threads work, I think it's
>>> actually fine just keep our discussion here. It's up to you, Xuefu.
>>> 
>>> The google doc LGTM. I left some minor comments.
>>> 
>>> On Thu, Nov 1, 2018 at 10:17 AM Bowen Li  wrote:
>>> 
 Hi all,
 
 As Xuefu has published the design doc on google, I agree with Shuyi's
 suggestion that we probably should start a new email thread like "[DISCUSS]
 ... Hive integration design ..." on only dev mailing list for community
 devs to review. The current thread sends to both dev and user list.
 
 This email thread is more like validating the general idea and direction
 with the community, and it's been pretty long and crowded so far. Since
 everyone is pro for the idea, we can move forward with another thread to
 discuss and finalize the design.
 
 Thanks,
 Bowen
 
 On Wed, Oct 31, 2018 at 12:16 PM Zhang, Xuefu 
 wrote:
 
> Hi Shuiyi,
> 
> Good idea. Actually the PDF was converted from a google doc. Here is its
> link:
> 
> https://docs.google.com/document/d/1SkppRD_rE3uOKSN-LuZCqn4f7dz0zW5aa6T_hBZq5_o/edit?usp=sharing
> Once we reach an agreement, I can convert it to a FLIP.
> 
> Thanks,
> Xuefu
> 
> 
> 
> --
> Sender:Shuyi Chen 
> Sent at:2018 Nov 1 (Thu) 02:47
> Recipient:Xuefu 
> Cc:vino yang ; Fabian Hueske ;
> dev ; user 
> Subject:Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem
> 
> Hi Xuefu,
> 
> Thanks a lot for driving this big effort. I would suggest convert your
> proposal and design doc into a google doc, and share it on the dev mailing
> list for the community to review and comment with title like "[DISCUSS] 
> ...
> Hive integration design ..." . Once approved,  we can document it as a 
> FLIP
> (Flink Improvement Proposals), and use JIRAs to track the implementations.
> What do you think?
> 
> Shuyi

Memory size for network buffers

2018-11-12 Thread Thomas Weise
Hi,

I'm trying to understand the intention behind the size parameters for
network buffers, specifically max, min and fraction. The reason we are
looking at it is an attempt to make the memory allocation elastic, so that
memory is allocated according to the actual number of buffers required
(within a range), without the need to tune this for every deployment.

As of Flink 1.5, there are 3 parameters, but they all result in a fixed
allocation, which is not what we were looking for.

Here is an example just to illustrate it:

taskmanager.network.memory.fraction: 0.01
taskmanager.network.memory.min: 32768
taskmanager.network.memory.max: 1073741824
taskmanager.memory.segment-size: 8192

I wanted fraction to be out of the picture (but 0 isn't an acceptable
value).

Then set min to something tiny that my job will exceed and max to something
too large to reach. Unfortunately, that fails:

java.io.IOException: Insufficient number of network buffers: required 8,
but only 0 available. The total number of network buffers is currently set
to 4 of 8192 bytes each. You can increase this number by setting the
configuration keys 'taskmanager.network.memory.fraction',
'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'.

So the question then is, why have min and max? Or is the intention to have
a different implementation in the future?

Thanks,
Thomas


Re: Kinesis consumer e2e test

2018-11-12 Thread Stefan Richter
Hi,

yes, that is correct. The failure mapper is there to cause a failover event for 
which we can then check i) that exactly-once or at-least-once is not violated, 
depending on the expected semantics and ii) that the restore works at all ;-). 
You might be able to reuse org.apache.flink.streaming.tests.FailureMapper for 
this. For the future, it would surely also be nice to have a test that covers 
rescaling as well, but for now just having any real test is already a great 
improvement.

Best,
Stefan

> On 12. Nov 2018, at 05:23, Thomas Weise  wrote:
> 
> Hi Stefan,
> 
> Thanks for the info. So if I understand correctly, the pipeline you had in
> mind is:
> 
> Consumer -> Map -> Producer
> 
> What do you expect as outcome of the mapper failure? That no records are
> lost but some possibly duplicated in the sink?
> 
> Regarding the abstraction, I will see what I can do in that regard. From
> where I start it may make more sense to do some of that as follow-up when
> the Kafka test is ported.
> 
> Thanks,
> Thomas
> 
> 
> On Thu, Nov 8, 2018 at 10:20 AM Stefan Richter 
> wrote:
> 
>> Hi,
>> 
>> I was also just planning to work on it before Stephan contacted Thomas to
>> ask about this test.
>> 
>> Thomas, you are right about the structure, the test should also go into
>> the `run-nightly-tests.sh`. What I was planning to do is a simple job that
>> consists of a Kinesis consumer, a mapper that fails once after n records,
>> and a kinesis producer. I was hoping that creation, filling, and validation
>> of the Kinesis topics can be done with the Java API, not by invoking
>> commands in a bash script. In general I would try to minimise the amount of
>> scripting and do as much in Java as possible. It would also be nice if the
>> test was generalised, e.g. that abstract Producer/Consumer are created from
>> a Supplier and also the validation is done over some abstraction that lets
>> us iterate over the produced output. Ideally, this would be a test that we
>> can reuse for all Consumer/Producer cases and we could also port the tests
>> for Kafka to that. What do you think?
>> 
>> Best,
>> Stefan
>> 
>>> On 8. Nov 2018, at 07:22, Tzu-Li (Gordon) Tai 
>> wrote:
>>> 
>>> Hi Thomas,
>>> 
>>> I think Stefan Richter is also working on the Kinesis end-to-end test,
>> and
>>> seems to be planning to implement it against a real Kinesis service
>> instead
>>> of Kinesalite.
>>> Perhaps efforts should be synced here.
>>> 
>>> Cheers,
>>> Gordon
>>> 
>>> 
>>> On Thu, Nov 8, 2018 at 1:38 PM Thomas Weise  wrote:
>>> 
 Hi,
 
 I'm planning to add an end-to-end test for the Kinesis consumer. We have
 done something similar at Lyft, using Kinesalite, which can be run as
 Docker container.
 
 I see that some tests already make use of Docker, so we can assume it
>> to be
 present in the target environment(s)?
 
 I also found the following ticket:
 https://issues.apache.org/jira/browse/FLINK-9007
 
 It suggest to also cover the producer, which may be a good way to create
 the input data as well. The stream itself can be created with the
>> Kinesis
 Java SDK.
 
 Following the existing layout, there would be a new module
 flink-end-to-end-tests/flink-kinesis-test
 
 Are there any suggestions or comments regarding this?
 
 Thanks,
 Thomas
 
>> 
>> 



Re: How to use RocksDBStateBackend predefined options

2018-11-12 Thread Thomas Weise
Sounds good. Perhaps it would also be good to allow the user to specify an
options factory in flink-conf.yaml for more flexibility?

Thomas

On Mon, Nov 12, 2018 at 9:48 AM Stefan Richter 
wrote:

> Hi,
>
> Ufuk is right, for historical reasons there is currently only the
> programatic way but I think nothing speaks fundamentally against offering
> configuration via config in the future (maybe just a lot of config keys
> must be introduced to cover all options).
>
> Best,
> Stefan
>
> > On 9. Nov 2018, at 22:52, Ufuk Celebi  wrote:
> >
> > Hey Thomas,
> >
> > On Fri, Nov 9, 2018 at 6:07 PM Thomas Weise  wrote:
> >> Is there a way to activate the predefined options via configuration /
> flink-
> >> conf.yaml? Or only programmatically, like in [4]? The difficulty with
> the
> >> programmatic route (assuming this works now), is that in my case the
> client
> >> is Beam and I'm not writing the code that submits the job.
> >
> > AFAIK no. You can only do it programmatically at the moment [1].
> > Having the option to configure all settings through the configuration
> > file seems to be a valid feature request to me.
> >
> > @Stefan Richter (cc'd): Are there any reasons that speak against
> > exposing these options via the configuration file in your opinion?
> >
> > Best,
> >
> > Ufuk
> >
> > [1] Looking at the code in [2] and [3], the only options that are
> > exposed via the configuration file are local directories and the timer
> > service factory.
> > [2]
> https://github.com/apache/flink/blob/master/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java
> > [3]
> https://github.com/apache/flink/blob/master/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
>
>


Re: How to use RocksDBStateBackend predefined options

2018-11-12 Thread Stefan Richter
Hi,

Ufuk is right, for historical reasons there is currently only the programatic 
way but I think nothing speaks fundamentally against offering configuration via 
config in the future (maybe just a lot of config keys must be introduced to 
cover all options).

Best,
Stefan

> On 9. Nov 2018, at 22:52, Ufuk Celebi  wrote:
> 
> Hey Thomas,
> 
> On Fri, Nov 9, 2018 at 6:07 PM Thomas Weise  wrote:
>> Is there a way to activate the predefined options via configuration / flink-
>> conf.yaml? Or only programmatically, like in [4]? The difficulty with the
>> programmatic route (assuming this works now), is that in my case the client
>> is Beam and I'm not writing the code that submits the job.
> 
> AFAIK no. You can only do it programmatically at the moment [1].
> Having the option to configure all settings through the configuration
> file seems to be a valid feature request to me.
> 
> @Stefan Richter (cc'd): Are there any reasons that speak against
> exposing these options via the configuration file in your opinion?
> 
> Best,
> 
> Ufuk
> 
> [1] Looking at the code in [2] and [3], the only options that are
> exposed via the configuration file are local directories and the timer
> service factory.
> [2] 
> https://github.com/apache/flink/blob/master/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java
> [3] 
> https://github.com/apache/flink/blob/master/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java



[jira] [Created] (FLINK-10855) CheckpointCoordinator does not delete checkpoint directory of late/failed checkpoints

2018-11-12 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-10855:
-

 Summary: CheckpointCoordinator does not delete checkpoint 
directory of late/failed checkpoints
 Key: FLINK-10855
 URL: https://issues.apache.org/jira/browse/FLINK-10855
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing
Affects Versions: 1.6.2, 1.5.5, 1.7.0
Reporter: Till Rohrmann


In case that an acknowledge checkpoint message is late or a checkpoint cannot 
be acknowledged, we discard the subtask state in the {{CheckpointCoordinator}}. 
What's not happening in this case is that we delete the parent directory of the 
checkpoint. This only happens when we dispose a {{PendingCheckpoint#dispose}}. 

Due to this behaviour it can happen that a checkpoint fails (e.g. a task not 
being ready) and we delete the checkpoint directory. Next another task writes 
its checkpoint data to the checkpoint directory (thereby creating it again) and 
sending an acknowledge message back to the {{CheckpointCoordinator}}. The 
{{CheckpointCoordinator}} will realize that there is no longer a 
{{PendingCheckpoint}} and will discard the sub task state. This will remove the 
state files from the checkpoint directory but will leave the checkpoint 
directory untouched.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10856) Harden resume from externalized checkpoint E2E test

2018-11-12 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-10856:
-

 Summary: Harden resume from externalized checkpoint E2E test
 Key: FLINK-10856
 URL: https://issues.apache.org/jira/browse/FLINK-10856
 Project: Flink
  Issue Type: Bug
  Components: E2E Tests, State Backends, Checkpointing
Affects Versions: 1.6.2, 1.5.5, 1.7.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
 Fix For: 1.5.6, 1.6.3, 1.7.0


The resume from externalized checkpoints E2E test can fail due to FLINK-10855. 
We should harden the test script to not expect a single checkpoint directory 
being present but to take the checkpoint with the highest checkpoint counter.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10853) Provide end-to-end test for Kafka 0.9 connector

2018-11-12 Thread vinoyang (JIRA)
vinoyang created FLINK-10853:


 Summary: Provide end-to-end test for Kafka 0.9 connector
 Key: FLINK-10853
 URL: https://issues.apache.org/jira/browse/FLINK-10853
 Project: Flink
  Issue Type: Test
  Components: E2E Tests
Reporter: vinoyang
Assignee: vinoyang






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10854) Provide end-to-end test for Kafka 0.8 connector

2018-11-12 Thread vinoyang (JIRA)
vinoyang created FLINK-10854:


 Summary: Provide end-to-end test for Kafka 0.8 connector
 Key: FLINK-10854
 URL: https://issues.apache.org/jira/browse/FLINK-10854
 Project: Flink
  Issue Type: Test
  Components: E2E Tests
Reporter: vinoyang
Assignee: vinoyang






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


REST job submission

2018-11-12 Thread Flavio Pompermaier
Hi to all,
in our ETL we need to call an external (REST) service once a job ends: we
extract informations about accumulators and we update the job status.
However this is only possible if using the CLI client: if we call the job
via the REST API o Web UI (that is very useful to decouple our UI from the
Flink cluster) then this is not possible, because the REST API cannot
execute any code after env.execute().
I think that this is a very huge limitation: first of all, when writing
(and debugging) a Flink job, you assume that you can call multiple times
execute() and use the returned JobExecutionResult.
In second instance, the binary client and the rest client behaves
differently (with the CLI client everything works as expected).

What do you think about this? Is this a bug or not?

PS: I think also that the REST client should not be aware of any jar or
class instance, it should just call the job manager with the proper class
name and jar id (plus other options of course).

Cheers,
Flavio


[jira] [Created] (FLINK-10852) Decremented number of unfinished producers below 0. This is most likely a bug in the execution state/intermediate result partition management

2018-11-12 Thread ouyangzhe (JIRA)
ouyangzhe created FLINK-10852:
-

 Summary: Decremented number of unfinished producers below 0. This 
is most likely a bug in the execution state/intermediate result partition 
management
 Key: FLINK-10852
 URL: https://issues.apache.org/jira/browse/FLINK-10852
 Project: Flink
  Issue Type: Bug
  Components: Core
Affects Versions: 1.7.0
Reporter: ouyangzhe
 Fix For: 1.8.0


 
{panel:title=Jobs using DataSet iteration operator, if set 
jobmanager.execution.failover-strategy: region, will hang on FAILING state when 
failover and has the following exception.}


java.lang.IllegalStateException: Decremented number of unfinished producers 
below 0. This is most likely a bug in the execution state/intermediate result 
partition management. at 
org.apache.flink.runtime.executiongraph.IntermediateResultPartition.markFinished(IntermediateResultPartition.java:103)
 at 
org.apache.flink.runtime.executiongraph.ExecutionVertex.finishAllBlockingPartitions(ExecutionVertex.java:707)
 at 
org.apache.flink.runtime.executiongraph.Execution.markFinished(Execution.java:939)
 at 
org.apache.flink.runtime.executiongraph.ExecutionGraph.updateState(ExecutionGraph.java:1568)
 at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:542)
 at sun.reflect.GeneratedMethodAccessor25.invoke(Unknown Source) at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498) at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
 at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
 at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
 at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142) 
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
 at 
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) 
at akka.actor.Actor$class.aroundReceive(Actor.scala:502) at 
akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) at 
akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) at 
akka.actor.ActorCell.invoke(ActorCell.scala:495) at 
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at 
akka.dispatch.Mailbox.run(Mailbox.scala:224) at 
akka.dispatch.Mailbox.exec(Mailbox.scala:234) at 
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
{panel}
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10851) sqlUpdate support complex insert grammar

2018-11-12 Thread frank wang (JIRA)
frank wang created FLINK-10851:
--

 Summary: sqlUpdate support complex insert grammar
 Key: FLINK-10851
 URL: https://issues.apache.org/jira/browse/FLINK-10851
 Project: Flink
  Issue Type: Bug
Reporter: frank wang


my code is
{{tableEnv.sqlUpdate("insert into kafka.sdkafka.product_4 select filedName1, 
filedName2 from kafka.sdkafka.order_4");}}

but flink give me error info, said kafka "No table was registered under the 
name kafka"
i modify the code ,that is ok now
TableEnvironment.scala


{code:java}
def sqlUpdate(stmt: String, config: QueryConfig): Unit = {
  val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, 
getTypeFactory)
  // parse the sql query
  val parsed = planner.parse(stmt)
  parsed match {
case insert: SqlInsert =>
  // validate the SQL query
  val query = insert.getSource
  val validatedQuery = planner.validate(query)

  // get query result as Table
  val queryResult = new Table(this, 
LogicalRelNode(planner.rel(validatedQuery).rel))

  // get name of sink table
  val targetTableName = 
insert.getTargetTable.asInstanceOf[SqlIdentifier].names.get(0)

  // insert query result into sink table
  insertInto(queryResult, targetTableName, config)
case _ =>
  throw new TableException(
"Unsupported SQL query! sqlUpdate() only accepts SQL statements of type 
INSERT.")
  }
}
{code}
should modify to this
{code:java}
def sqlUpdate(stmt: String, config: QueryConfig): Unit = {
  val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, 
getTypeFactory)
  // parse the sql query
  val parsed = planner.parse(stmt)
  parsed match {
case insert: SqlInsert =>
  // validate the SQL query
  val query = insert.getSource
  val validatedQuery = planner.validate(query)

  // get query result as Table
  val queryResult = new Table(this, 
LogicalRelNode(planner.rel(validatedQuery).rel))

  // get name of sink table
  //val targetTableName = 
insert.getTargetTable.asInstanceOf[SqlIdentifier].names.get(0)
  val targetTableName = insert.getTargetTable.toString

  // insert query result into sink table
  insertInto(queryResult, targetTableName, config)
case _ =>
  throw new TableException(
"Unsupported SQL query! sqlUpdate() only accepts SQL statements of type 
INSERT.")
  }
}
{code}
 

i hope this can be acceptted, thx



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10850) Job may hang on FAILING state if taskmanager updateTaskExecutionState failed

2018-11-12 Thread ouyangzhe (JIRA)
ouyangzhe created FLINK-10850:
-

 Summary: Job may hang on FAILING state if taskmanager 
updateTaskExecutionState failed
 Key: FLINK-10850
 URL: https://issues.apache.org/jira/browse/FLINK-10850
 Project: Flink
  Issue Type: Bug
  Components: JobManager
Affects Versions: 1.5.5
Reporter: ouyangzhe
 Fix For: 1.8.0


I encountered a job which is oom but hung on FAILING state. It left 3 slots to 
release, and the corresponding task state is CANCELING.

I found the following log in the taskmanager, it seems that taskmanager tried 
to updateTaskExecutionState from CANCELING to CANCELED, but OOMed.
{panel}


2018-11-08 18:01:23,250 INFO  org.apache.flink.runtime.taskmanager.Task         
            - PartialSolution (BulkIteration (Bulk Iteration)) (97/600) 
(46005ba837e
fc4ebf783fc92121e55a8) switched from RUNNING to CANCELING.
2018-11-08 18:01:23,257 INFO  org.apache.flink.runtime.taskmanager.Task         
            - Triggering cancellation of task code PartialSolution 
(BulkIteration (B
ulk Iteration)) (97/600) (46005ba837efc4ebf783fc92121e55a8).
2018-11-08 18:01:44,081 INFO  org.apache.flink.runtime.taskmanager.Task         
            - PartialSolution (BulkIteration (Bulk Iteration)) (97/600) 
(46005ba837e
fc4ebf783fc92121e55a8) switched from CANCELING to CANCELED.
2018-11-08 18:01:44,081 INFO  org.apache.flink.runtime.taskmanager.Task         
            - Freeing task resources for PartialSolution (BulkIteration (Bulk 
Iterat
ion)) (97/600) (46005ba837efc4ebf783fc92121e55a8).
2018-11-08 18:02:03,097 WARN  org.apache.flink.runtime.taskmanager.Task         
            - Task 'PartialSolution (BulkIteration (Bulk Iteration)) (97/600)' 
did n
ot react to cancelling signal for 30 seconds, but is stuck in method:
 
org.apache.flink.shaded.guava18.com.google.common.collect.Maps$EntryFunction$1.apply(Maps.java:86)
org.apache.flink.shaded.guava18.com.google.common.collect.Iterators$8.transform(Iterators.java:799)
org.apache.flink.shaded.guava18.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:48)
java.util.AbstractCollection.toArray(AbstractCollection.java:141)
org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList.copyOf(ImmutableList.java:258)
org.apache.flink.runtime.io.network.partition.ResultPartitionManager.releasePartitionsProducedBy(ResultPartitionManager.java:100)
org.apache.flink.runtime.io.network.NetworkEnvironment.unregisterTask(NetworkEnvironment.java:275)
org.apache.flink.runtime.taskmanager.Task.run(Task.java:833)
java.lang.Thread.run(Thread.java:745)

2018-11-08 18:02:05,665 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor            - Discarding the 
results produced by task execution e9141e20871e530dee90
4ddce11adca0.
2018-11-08 18:02:22,536 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor            - Discarding the 
results produced by task execution 7fac76a5d76247d803e1
f1c47a6b385f.
2018-11-08 18:03:47,210 WARN  org.apache.flink.runtime.taskmanager.Task         
            - Task 'PartialSolution (BulkIteration (Bulk Iteration)) (97/600)' 
did n
ot react to cancelling signal for 30 seconds, but is stuck in method:
 
org.apache.flink.runtime.memory.MemoryManager.releaseAll(MemoryManager.java:497)

org.apache.flink.runtime.taskmanager.Task.run(Task.java:837)
java.lang.Thread.run(Thread.java:745)

2018-11-08 18:03:47,213 INFO  org.apache.flink.runtime.taskmanager.Task         
            - Ensuring all FileSystem streams are closed for task 
PartialSolution (B
ulkIteration (Bulk Iteration)) (97/600) (46005ba837efc4ebf783fc92121e55a8) 
[CANCELED]
2018-11-08 18:03:47,215 WARN  
org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline  - 
An exception was thrown by a user handler while handlin
g an exception event ([id: 0x397132f7, /11.10.199.197:33286 => 
/11.9.137.228:40859] EXCEPTION: java.lang.OutOfMemoryError: GC overhead limit 
exceeded)
java.lang.OutOfMemoryError: GC overhead limit exceeded
        at 
org.apache.flink.shaded.akka.org.jboss.netty.buffer.HeapChannelBuffer.(HeapChannelBuffer.java:42)
        at 
org.apache.flink.shaded.akka.org.jboss.netty.buffer.BigEndianHeapChannelBuffer.(BigEndianHeapChannelBuffer.java:34)
        at 
org.apache.flink.shaded.akka.org.jboss.netty.buffer.ChannelBuffers.buffer(ChannelBuffers.java:134)
        at 
org.apache.flink.shaded.akka.org.jboss.netty.buffer.HeapChannelBufferFactory.getBuffer(HeapChannelBufferFactory.java:68)
        at 
org.apache.flink.shaded.akka.org.jboss.netty.buffer.AbstractChannelBufferFactory.getBuffer(AbstractChannelBufferFactory.java:48)
        at 
org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.FrameDecoder.extractFrame(FrameDecoder.java:566)
        at