Re: windowAll and AggregateFunction

2019-01-09 Thread CPC
Hi Ken,

I am doing a global distinct. What i want to achive is someting like below.
With windowAll it sends all data to single operator which means shuffle all
data and calculate with par 1. I dont want to shuffle data since i just
want to feed it to hll instance and shuffle just hll instances at the end
of the window and merge them. This is exactly the same scenario with global
count. Suppose you want to count events for each 1 minutes window. In
current case we should send all data to single operator and count there.
Instead of this we can calculate sub totals and then send those subtotals
to single operator and merge there.


[image: image.png]

On Thu, 10 Jan 2019 at 02:26, Ken Krugler 
wrote:

>
> On Jan 9, 2019, at 3:10 PM, CPC  wrote:
>
> Hi Ken,
>
> From regular time-based windows do you mean keyed windows?
>
>
> Correct. Without doing a keyBy() you would have a parallelism of 1.
>
> I think you want to key on whatever you’re counting for unique values, so
> that each window operator gets a slice of the unique values.
>
> — Ken
>
> On Wed, Jan 9, 2019, 10:22 PM Ken Krugler  wrote:
>
>> Hi there,
>>
>> You should be able to use a regular time-based window(), and emit the
>> HyperLogLog binary data as your result, which then would get merged in your
>> custom function (which you set a parallelism of 1 on).
>>
>> Note that if you are generating unique counts per non-overlapping time
>> window, you’ll need to keep N HLL structures in each operator.
>>
>> — Ken
>>
>>
>> On Jan 9, 2019, at 10:26 AM, CPC  wrote:
>>
>> Hi Stefan,
>>
>> Could i use "Reinterpreting a pre-partitioned data stream as keyed
>> stream" feature for this?
>>
>> On Wed, 9 Jan 2019 at 17:50, Stefan Richter 
>> wrote:
>>
>>> Hi,
>>>
>>> I think your expectation about windowAll is wrong, from the method
>>> documentation: “Note: This operation is inherently non-parallel since all
>>> elements have to pass through the same operator instance” and I also cannot
>>> think of a way in which the windowing API would support your use case
>>> without a shuffle. You could probably build the functionality by hand
>>> through, but I guess this is not quite what you want.
>>>
>>> Best,
>>> Stefan
>>>
>>> > On 9. Jan 2019, at 13:43, CPC  wrote:
>>> >
>>> > Hi all,
>>> >
>>> > In our implementation,we are consuming from kafka and calculating
>>> distinct with hyperloglog. We are using windowAll function with a custom
>>> AggregateFunction but flink runtime shows a little bit unexpected behavior
>>> at runtime. Our sources running with parallelism 4 and i expect add
>>> function to run after source calculate partial results and at the end of
>>> the window i expect it to send 4 hll object to single operator to merge
>>> there(merge function). Instead, it sends all data to single instance and
>>> call add function there.
>>> >
>>> > Is here any way to make flink behave like this? I mean calculate
>>> partial results after consuming from kafka with paralelism of sources
>>> without shuffling(so some part of the calculation can be calculated in
>>> parallel) and merge those partial results with a merge function?
>>> >
>>> > Thank you in advance...
>>>
>>>
>> --
>> Ken Krugler
>> +1 530-210-6378
>> http://www.scaleunlimited.com
>> Custom big data solutions & training
>> Flink, Solr, Hadoop, Cascading & Cassandra
>>
>>
> --
> Ken Krugler
> +1 530-210-6378
> http://www.scaleunlimited.com
> Custom big data solutions & training
> Flink, Solr, Hadoop, Cascading & Cassandra
>
>


Re: ConnectTimeoutException when createPartitionRequestClient

2019-01-09 Thread Wenrui Meng
Hi Zhijiang,

Thanks for your reply. I first also suspect the same reason. But once I
read the connected host log, the netty server starts to listen on the
correct port after 2 seconds of task manager start. I compared the log of
the connected host and connecting host log, it seems requesting partition
happened after the connected host netty server starts.

But I think there is some other hardware or config issue as I replied to
Till. I will work with our infra team to see whether there is some obvious
issue on that cluster. Meanwhile, if there is anyone knows how to config
the netty nio channel timeout.

Thanks,
Wenrui

On Wed, Jan 9, 2019 at 7:49 PM zhijiang  wrote:

> Hi Wenrui,
>
> I suspect another issue which might cause connection failure. You can
> check whether the netty server already binds and listens port successfully
> in time before the client requests connection. If there exists some
> time-consuming process during TM startup which might delay netty server
> start, so when the client requests connection, the server is not ready
> which may cause connection timeout or failure.
>
> From your description, it seems exist in only some TM. Because when you
> decrease the total parallel, it might miss the problem TM and does not
> cause this issue. The default number of netty thread and timeout should
> make sense for normal cases.
>
> Best,
> Zhijiang
>
> --
> From:Wenrui Meng 
> Send Time:2019年1月9日(星期三) 18:18
> To:Till Rohrmann 
> Cc:user ; Konstantin 
> Subject:Re: ConnectTimeoutException when createPartitionRequestClient
>
> Hi Till,
>
> This job is not on AthenaX but on a special uber version Flink. I tried to
> ping the connected host from connecting host. It seems very stable. For the
> connection timeout, I do set it as 20min but it still report the timeout
> after 2 minutes. Could you let me know how do you test locally about the
> timeout setting?
>
> Thanks,
> Wenrui
>
> On Tue, Jan 8, 2019 at 7:06 AM Till Rohrmann  wrote:
> Hi Wenrui,
>
> the exception now occurs while finishing the connection creation. I'm not
> sure whether this is so different. Could it be that your network is
> overloaded or not very reliable? Have you tried running your Flink job
> outside of AthenaX?
>
> Cheers,
> Till
>
> On Tue, Jan 8, 2019 at 2:50 PM Wenrui Meng  wrote:
> Hi Till,
>
> Thanks for your reply. Our cluster is Yarn cluster. I found that if we
> decrease the total parallel the timeout issue can be avoided. But we do
> need that amount of taskManagers to process data. In addition, once I
> increase the netty server threads to 128, the error is changed to to
> following error. It seems the cause is different. Could you help take a
> look?
>
> 2b0ac47c1eb1bcbbbe4a97) switched from RUNNING to FAILED.
> java.io.IOException: Connecting the channel failed: Connecting to remote
> task manager + 'athena464-sjc1/10.70.129.13:39466' has failed. This might
> indicate that the remote task manager has been lost.
> at
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:197)
> at
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:132)
> at
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:84)
> at
> org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:59)
> at
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:156)
> at
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:480)
> at
> org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.requestPartitions(UnionInputGate.java:134)
> at
> org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:148)
> at
> org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:93)
> at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:214)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by:
> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
> Connecting to remote task manager + 'athena464-sjc1/10.70.129.13:39466'
> has failed. This might indicate that the remote task manager has been lost.
> at
> 

Re: ConnectTimeoutException when createPartitionRequestClient

2019-01-09 Thread Wenrui Meng
Hi Till,

I will try the local test according to your suggestion. Uber Flink version
is mainly adding something to integrate with Uber deployment and other
infra components. There is no change for Flink original code flow.

I also found that the issue can be avoided with the same setting in other
clusters. I guess there is a network or other hardware issue. But it's
still painful to me that taskmanager.network.netty.client.connectTimeoutSec
might not be the only config affect the netty connection timeout, since I
increase it to 1200sec timeout still happened after 2minutes.

Thanks,
Wenrui

On Wed, Jan 9, 2019 at 2:06 AM Till Rohrmann  wrote:

> Hi Wenrui,
>
> I executed AutoParallelismITCase#testProgramWithAutoParallelism and set a
> breakpoint in NettClient.java:102 to see whether the configured timeout
> value is correctly set. Moreover, I did the same for
> AbstractNioChannel.java:207 and it looked as if the correct timeout value
> was set.
>
> What is the special uber Flink version? What patches does it include? Are
> you able to run your tests with the latest vanilla Flink version?
>
> Cheers,
> Till
>
> On Wed, Jan 9, 2019 at 10:40 AM Wenrui Meng  wrote:
>
>> Hi Till,
>>
>> This job is not on AthenaX but on a special uber version Flink. I tried
>> to ping the connected host from connecting host. It seems very stable. For
>> the connection timeout, I do set it as 20min but it still report the
>> timeout after 2 minutes. Could you let me know how do you test locally
>> about the timeout setting?
>>
>> Thanks,
>> Wenrui
>>
>> On Tue, Jan 8, 2019 at 7:06 AM Till Rohrmann 
>> wrote:
>>
>>> Hi Wenrui,
>>>
>>> the exception now occurs while finishing the connection creation. I'm
>>> not sure whether this is so different. Could it be that your network is
>>> overloaded or not very reliable? Have you tried running your Flink job
>>> outside of AthenaX?
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Jan 8, 2019 at 2:50 PM Wenrui Meng  wrote:
>>>
 Hi Till,

 Thanks for your reply. Our cluster is Yarn cluster. I found that if we
 decrease the total parallel the timeout issue can be avoided. But we do
 need that amount of taskManagers to process data. In addition, once I
 increase the netty server threads to 128, the error is changed to to
 following error. It seems the cause is different. Could you help take a
 look?

 2b0ac47c1eb1bcbbbe4a97) switched from RUNNING to FAILED.
 java.io.IOException: Connecting the channel failed: Connecting to
 remote task manager + 'athena464-sjc1/10.70.129.13:39466' has failed.
 This might indicate that the remote task manager has been lost.
 at
 org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:197)
 at
 org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:132)
 at
 org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:84)
 at
 org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:59)
 at
 org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:156)
 at
 org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:480)
 at
 org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.requestPartitions(UnionInputGate.java:134)
 at
 org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:148)
 at
 org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:93)
 at
 org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:214)
 at
 org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
 at
 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
 at java.lang.Thread.run(Thread.java:748)
 Caused by:
 org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
 Connecting to remote task manager + 'athena464-sjc1/10.70.129.13:39466'
 has failed. This might indicate that the remote task manager has been lost.
 at
 org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:220)
 at
 

Custom Serializer for Avro GenericRecord

2019-01-09 Thread Gagan Agrawal
Hi,
I am using Avro GenericRecord for most of IN/OUT types from my custom
functions. What I have noticed is that default Avro GenericRecord
serializer, also serializes Schema which makes messages very heavy and
hence impacts overall performance.  In my case I already know the schema
before hand and hence can use same to serialize / deserialize records
without schema which should reduce size by great factor. Are there any
guidelines on how this custom serializer can be registered? One way I see
could be to first "enableEnforceKryo()" on StreamingEnvironment and then
register custom serializer for GenericRecord Type as here [1]. However I am
not sure if it's right approach or there is better approach to achieve
same. Also I would need some customization while storing these messages in
state store (RocksDb in my case) which I think can be achieved as here [2].

Please note I can not use POJOs in my case as it's generalized framework on
top of Flink.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/custom_serializers.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/state/custom_serialization.html

Gagan


Re: ConnectTimeoutException when createPartitionRequestClient

2019-01-09 Thread zhijiang
Hi Wenrui,

I suspect another issue which might cause connection failure. You can check 
whether the netty server already binds and listens port successfully in time 
before the client requests connection. If there exists some time-consuming 
process during TM startup which might delay netty server start, so when the 
client requests connection, the server is not ready which may cause connection 
timeout or failure.

From your description, it seems exist in only some TM. Because when you 
decrease the total parallel, it might miss the problem TM and does not cause 
this issue. The default number of netty thread and timeout should make sense 
for normal cases.

Best,
Zhijiang


--
From:Wenrui Meng 
Send Time:2019年1月9日(星期三) 18:18
To:Till Rohrmann 
Cc:user ; Konstantin 
Subject:Re: ConnectTimeoutException when createPartitionRequestClient

Hi Till,

This job is not on AthenaX but on a special uber version Flink. I tried to ping 
the connected host from connecting host. It seems very stable. For the 
connection timeout, I do set it as 20min but it still report the timeout after 
2 minutes. Could you let me know how do you test locally about the timeout 
setting?

Thanks,
Wenrui
On Tue, Jan 8, 2019 at 7:06 AM Till Rohrmann  wrote:
Hi Wenrui,

the exception now occurs while finishing the connection creation. I'm not sure 
whether this is so different. Could it be that your network is overloaded or 
not very reliable? Have you tried running your Flink job outside of AthenaX?

Cheers,
Till
On Tue, Jan 8, 2019 at 2:50 PM Wenrui Meng  wrote:
Hi Till,

Thanks for your reply. Our cluster is Yarn cluster. I found that if we decrease 
the total parallel the timeout issue can be avoided. But we do need that amount 
of taskManagers to process data. In addition, once I increase the netty server 
threads to 128, the error is changed to to following error. It seems the cause 
is different. Could you help take a look?

2b0ac47c1eb1bcbbbe4a97) switched from RUNNING to FAILED.
java.io.IOException: Connecting the channel failed: Connecting to remote task 
manager + 'athena464-sjc1/10.70.129.13:39466' has failed. This might indicate 
that the remote task manager has been lost.
at 
org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:197)
at 
org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:132)
at 
org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:84)
at 
org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:59)
at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:156)
at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:480)
at 
org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.requestPartitions(UnionInputGate.java:134)
at 
org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:148)
at 
org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:93)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:214)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: 
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: 
Connecting to remote task manager + 'athena464-sjc1/10.70.129.13:39466' has 
failed. This might indicate that the remote task manager has been lost.
at 
org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:220)
at 
org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:132)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:603)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:563)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424)
at 

Multiple MapState vs single nested MapState in stateful Operator

2019-01-09 Thread Gagan Agrawal
Hi,
I have a use case where 4 streams get merged (union) and grouped on common
key (keyBy) and a custom KeyedProcessFunction is called. Now I need to keep
state (RocksDB backend) for all 4 streams in my custom KeyedProcessFunction
where each of these 4 streams would be stored as map. So I have 2 options

1. Create a separate MapStateDescriptor for each of these streams and store
their events separately.
2. Create a single MapStateDescriptor where there will be only 4 keys
(corresponding to 4 stream types) and value will be of type Map which
further keep events from respective streams.

I want to understand from performance perspective, would there be any
difference in above approaches. Will keeping 4 different MapState cause 4
lookups for RocksDB backend when they are accessed? Or all of these
MapStates are internally stored within RocksDB in single row corresponding
to respective key (as per keyedStream) and hence they are all fetched in
single call before operator's processElement is called? If there are
different lookups in RocksDB for each of MapStateDescriptor, then I think
keeping them in single MapStateDescriptor would be more efficient minimize
RocksDB calls? Please advise.

Gagan


Re: windowAll and AggregateFunction

2019-01-09 Thread Ken Krugler

> On Jan 9, 2019, at 3:10 PM, CPC  wrote:
> 
> Hi Ken,
> 
> From regular time-based windows do you mean keyed windows?

Correct. Without doing a keyBy() you would have a parallelism of 1.

I think you want to key on whatever you’re counting for unique values, so that 
each window operator gets a slice of the unique values.

— Ken

> On Wed, Jan 9, 2019, 10:22 PM Ken Krugler   wrote:
> Hi there,
> 
> You should be able to use a regular time-based window(), and emit the 
> HyperLogLog binary data as your result, which then would get merged in your 
> custom function (which you set a parallelism of 1 on).
> 
> Note that if you are generating unique counts per non-overlapping time 
> window, you’ll need to keep N HLL structures in each operator.
> 
> — Ken
> 
> 
>> On Jan 9, 2019, at 10:26 AM, CPC > > wrote:
>> 
>> Hi Stefan,
>> 
>> Could i use "Reinterpreting a pre-partitioned data stream as keyed stream" 
>> feature for this? 
>> 
>> On Wed, 9 Jan 2019 at 17:50, Stefan Richter > > wrote:
>> Hi,
>> 
>> I think your expectation about windowAll is wrong, from the method 
>> documentation: “Note: This operation is inherently non-parallel since all 
>> elements have to pass through the same operator instance” and I also cannot 
>> think of a way in which the windowing API would support your use case 
>> without a shuffle. You could probably build the functionality by hand 
>> through, but I guess this is not quite what you want.
>> 
>> Best,
>> Stefan
>> 
>> > On 9. Jan 2019, at 13:43, CPC > > > wrote:
>> > 
>> > Hi all,
>> > 
>> > In our implementation,we are consuming from kafka and calculating distinct 
>> > with hyperloglog. We are using windowAll function with a custom 
>> > AggregateFunction but flink runtime shows a little bit unexpected behavior 
>> > at runtime. Our sources running with parallelism 4 and i expect add 
>> > function to run after source calculate partial results and at the end of 
>> > the window i expect it to send 4 hll object to single operator to merge 
>> > there(merge function). Instead, it sends all data to single instance and 
>> > call add function there. 
>> > 
>> > Is here any way to make flink behave like this? I mean calculate partial 
>> > results after consuming from kafka with paralelism of sources without 
>> > shuffling(so some part of the calculation can be calculated in parallel) 
>> > and merge those partial results with a merge function?
>> > 
>> > Thank you in advance...
>> 
> 
> --
> Ken Krugler
> +1 530-210-6378
> http://www.scaleunlimited.com 
> Custom big data solutions & training
> Flink, Solr, Hadoop, Cascading & Cassandra
> 

--
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra



Producing binary Avro to Kafka

2019-01-09 Thread Elliot West
Hello,

What is the recommended flink streaming approach for serialising a POJO to
Avro according to a schema, and pushing the subsequent byte array into a
Kafka sink? Also, is there any existing approach for prepending the schema
id to the payload (following the Confluent pattern)?

Thanks,

Elliot.


Re: windowAll and AggregateFunction

2019-01-09 Thread Ken Krugler
Hi there,

You should be able to use a regular time-based window(), and emit the 
HyperLogLog binary data as your result, which then would get merged in your 
custom function (which you set a parallelism of 1 on).

Note that if you are generating unique counts per non-overlapping time window, 
you’ll need to keep N HLL structures in each operator.

— Ken


> On Jan 9, 2019, at 10:26 AM, CPC  wrote:
> 
> Hi Stefan,
> 
> Could i use "Reinterpreting a pre-partitioned data stream as keyed stream" 
> feature for this? 
> 
> On Wed, 9 Jan 2019 at 17:50, Stefan Richter  > wrote:
> Hi,
> 
> I think your expectation about windowAll is wrong, from the method 
> documentation: “Note: This operation is inherently non-parallel since all 
> elements have to pass through the same operator instance” and I also cannot 
> think of a way in which the windowing API would support your use case without 
> a shuffle. You could probably build the functionality by hand through, but I 
> guess this is not quite what you want.
> 
> Best,
> Stefan
> 
> > On 9. Jan 2019, at 13:43, CPC  > > wrote:
> > 
> > Hi all,
> > 
> > In our implementation,we are consuming from kafka and calculating distinct 
> > with hyperloglog. We are using windowAll function with a custom 
> > AggregateFunction but flink runtime shows a little bit unexpected behavior 
> > at runtime. Our sources running with parallelism 4 and i expect add 
> > function to run after source calculate partial results and at the end of 
> > the window i expect it to send 4 hll object to single operator to merge 
> > there(merge function). Instead, it sends all data to single instance and 
> > call add function there. 
> > 
> > Is here any way to make flink behave like this? I mean calculate partial 
> > results after consuming from kafka with paralelism of sources without 
> > shuffling(so some part of the calculation can be calculated in parallel) 
> > and merge those partial results with a merge function?
> > 
> > Thank you in advance...
> 

--
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra



Re: NoClassDefFoundError javax.xml.bind.DatatypeConverterImpl

2019-01-09 Thread Mike Mintz
For what it's worth, we believe are able to work around this issue by
adding the following line to our flink-conf.yaml:

classloader.parent-first-patterns.additional: javax.xml.;org.apache.xerces.


On Thu, Dec 6, 2018 at 2:28 AM Chesnay Schepler  wrote:

> Small correction: Flink 1.7 does not support jdk9; we only fixed some of
> the issues, not all of them.
>
> On 06.12.2018 07:13, Mike Mintz wrote:
>
> Hi Flink developers,
>
> We're running some new DataStream jobs on Flink 1.7.0 using the shaded
> Hadoop S3 file system, and running into frequent errors saving checkpoints
> and savepoints to S3. I'm not sure what the underlying reason for the error
> is, but we often fail with the following stack trace, which appears to be
> due to missing the javax.xml.bind.DatatypeConverterImpl class in an
> error-handling path for AmazonS3Client.
>
> java.lang.NoClassDefFoundError: Could not initialize class
> javax.xml.bind.DatatypeConverterImpl
> at
> javax.xml.bind.DatatypeConverter.initConverter(DatatypeConverter.java:140)
> at
> javax.xml.bind.DatatypeConverter.printBase64Binary(DatatypeConverter.java:611)
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.util.Base64.encodeAsString(Base64.java:62)
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.util.Md5Utils.md5AsBase64(Md5Utils.java:104)
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1647)
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.putObjectDirect(S3AFileSystem.java:1531)
>
> I uploaded the full stack trace at
> https://gist.github.com/mikemintz/4769fc7bc3320c84ac97061e951041a0
>
> For reference, we're running flink from the "Apache 1.7.0 Flink only Scala
> 2.11" binary tgz, we've copied flink-s3-fs-hadoop-1.7.0.jar from opt/ to
> lib/, we're not defining HADOOP_CLASSPATH, and we're running java 8
> (openjdk version "1.8.0_191") on Ubuntu 18.04 x86_64.
>
> Presumably there are two issues: 1) some periodic error with S3, and 2)
> some classpath / class loading issue with
> javax.xml.bind.DatatypeConverterImpl that's preventing the original error
> from being displayed. I'm more curious about the later issue.
>
> This is super puzzling since javax/xml/bind/DatatypeConverterImpl.class is
> included in our rt.jar, and lsof confirms we're reading that rt.jar, so I
> suspect it's something tricky with custom class loaders or the way the
> shaded S3 jar works. Note that this class is not included in
> flink-s3-fs-hadoop-1.7.0.jar (which we are using), but it is included in
> flink-shaded-hadoop2-uber-1.7.0.jar (which we are not using).
>
> Another thing that jumped out to us was that Flink 1.7 is now able to
> build JDK9, but Java 9 includes deprecation of the javax.xml.bind
> libraries, requiring explicit inclusion in a Java 9 module [0]. And we saw
> that direct references to javax.xml.bind were removed from flink-core for
> 1.7 [1]
>
> Some things we tried, without success:
>
>- Building flink from source on a computer with java 8 installed. We
>still got NoClassDefFoundError.
>- Using the binary version of Flink on machines with java 9 installed.
>We get NullPointerException in ClosureCleaner.
>- Downloading the jaxb-api jar [2], which has
>javax/xml/bind/DatatypeConverterImpl.class, and setting HADOOP_CLASSPATH to
>have that jar. We still got NoClassDefFoundError.
>- Using iptables to completely block S3 traffic, hoping this would
>make it easier to reproduce. The connection errors are properly displayed,
>so these connection errors must go down another error handling path.
>
> Would love to hear any ideas about what might be happening, or further
> ideas we can try.
>
> Thanks!
> Mike
>
> [0]
> http://cr.openjdk.java.net/~iris/se/9/java-se-9-fr-spec/#APIs-proposed-for-removal
>
> [1] https://github.com/apache/flink/pull/6801
>
> [2] https://mvnrepository.com/artifact/javax.xml.bind/jaxb-api/2.3.1
>
>
>


Re: windowAll and AggregateFunction

2019-01-09 Thread CPC
Hi Stefan,

Could i use "Reinterpreting a pre-partitioned data stream as keyed stream"
feature for this?

On Wed, 9 Jan 2019 at 17:50, Stefan Richter 
wrote:

> Hi,
>
> I think your expectation about windowAll is wrong, from the method
> documentation: “Note: This operation is inherently non-parallel since all
> elements have to pass through the same operator instance” and I also cannot
> think of a way in which the windowing API would support your use case
> without a shuffle. You could probably build the functionality by hand
> through, but I guess this is not quite what you want.
>
> Best,
> Stefan
>
> > On 9. Jan 2019, at 13:43, CPC  wrote:
> >
> > Hi all,
> >
> > In our implementation,we are consuming from kafka and calculating
> distinct with hyperloglog. We are using windowAll function with a custom
> AggregateFunction but flink runtime shows a little bit unexpected behavior
> at runtime. Our sources running with parallelism 4 and i expect add
> function to run after source calculate partial results and at the end of
> the window i expect it to send 4 hll object to single operator to merge
> there(merge function). Instead, it sends all data to single instance and
> call add function there.
> >
> > Is here any way to make flink behave like this? I mean calculate partial
> results after consuming from kafka with paralelism of sources without
> shuffling(so some part of the calculation can be calculated in parallel)
> and merge those partial results with a merge function?
> >
> > Thank you in advance...
>
>


RE: Kerberos error when restoring from HDFS backend after 24 hours

2019-01-09 Thread LINZ, Arnaud
Hi,

I've managed to correct this by implementing my own FsStateBackend based on the 
original one with proper Kerberos relogin in createCheckpointStorage().

Regards,
Arnaud

-Message d'origine-
De : LINZ, Arnaud
Envoyé : vendredi 4 janvier 2019 11:32
À : user 
Objet : Kerberos error when restoring from HDFS backend after 24 hours

Hello and happy new year to all flink users,

I have a streaming application (flink v1.7.0) on a Kerberized cluster, using a 
flink configuration file where the following parameters are set :

security.kerberos.login.use-ticket-cache: false
security.kerberos.login.keytab: X
security.kerberos.login.principal: X

As it is not sufficient, I also log to Kerberos the "open()" method of 
sources/sinks using hdfs or hiveserver2/impala servers using  
UserGroupInformation.loginUserFromKeytab(). And as it is even not sufficient in 
some case (namely the HiveServer2/Impala connection), I also attach a Jaas 
object to the TaskManager setting java.security.auth.login.config property 
dynamically. And as it is in some rare cases not even sufficient, I do run 
kinit as an external process from the task manager to create a local ticket 
cache...

With all that stuff, everything works fine for several days when the streaming 
app does not experience any problem. However, when a problem occurs, when 
restoring from the checkpoint (hdfs backend), I get the following exception if 
it occurs after 24h from the initial application launch (24h is the Kerberos 
ticket validation time):

java.io.IOException: Failed on local exception: java.io.IOException: 
javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: 
No valid credentials provided (Mechanism level: Failed to find any Kerberos 
tgt)]; Host Details : local host is: "x"; destination host is: "x;
org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:772)
org.apache.hadoop.ipc.Client.call(Client.java:1474)
org.apache.hadoop.ipc.Client.call(Client.java:1401)
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232)
com.sun.proxy.$Proxy9.mkdirs(Unknown Source)
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.mkdirs(ClientNamenodeProtocolTranslatorPB.java:539)
sun.reflect.GeneratedMethodAccessor19.invoke(Unknown Source)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:498)
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
com.sun.proxy.$Proxy10.mkdirs(Unknown Source)
org.apache.hadoop.hdfs.DFSClient.primitiveMkdir(DFSClient.java:2742)
org.apache.hadoop.hdfs.DFSClient.mkdirs(DFSClient.java:2713)
org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:870)
org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:866)
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
org.apache.hadoop.hdfs.DistributedFileSystem.mkdirsInternal(DistributedFileSystem.java:866)
org.apache.hadoop.hdfs.DistributedFileSystem.mkdirs(DistributedFileSystem.java:859)
org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1819)
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:170)
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:112)
org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.(FsCheckpointStorage.java:83)
org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.(FsCheckpointStorage.java:58)
org.apache.flink.runtime.state.filesystem.FsStateBackend.createCheckpointStorage(FsStateBackend.java:444)
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:257)
org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
java.lang.Thread.run(Thread.java:748)
Caused by : java.io.IOException: javax.security.sasl.SaslException: GSS 
initiate failed [Caused by GSSException: No valid credentials provided 
(Mechanism level: Failed to find any Kerberos tgt)]
org.apache.hadoop.ipc.Client$Connection$1.run(Client.java:682)
java.security.AccessController.doPrivileged(Native Method)
javax.security.auth.Subject.doAs(Subject.java:422)
(...)

Any idea about how I can circumvent this? For instance, can I "hook" the 
restoring process before the mkdir to relog to Kerberos by hand?

Best regards,
Arnaud




L'intégrité de ce message n'étant pas assurée sur internet, la société 
expéditrice ne peut être tenue responsable de son contenu ni de ses pièces 
jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous 
n'êtes pas destinataire de ce message, merci de le détruire et d'avertir 
l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company 
that sent this message cannot therefore be held liable for its content 

Re: Can checkpoints be used to migrate jobs between Flink versions ?

2019-01-09 Thread Edward Alexander Rojas Clavijo
Thanks very much for you rapid answer Stefan.

Regards,
Edward

El mié., 9 ene. 2019 a las 15:26, Stefan Richter ()
escribió:

> Hi,
>
> I would assume that this should currently work because the format of basic
> savepoints and checkpoints is the same right now. The restriction in the
> doc is probably there in case that the checkpoint format will diverge more
> in the future.
>
> Best,
> Stefan
>
> > On 9. Jan 2019, at 13:12, Edward Rojas  wrote:
> >
> > Hello,
> >
> > For upgrading jobs between Flink versions I follow the guide in the doc
> > here:
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/upgrading.html#upgrading-the-flink-framework-version
> >
> > It states that we should always use savepoints for this procedure, I
> > followed it and it works perfectly.
> >
> > I just would like to know if there is a reason why is not advised to use
> > checkpoints for this procedure.
> >
> > Say for example that the job has externalized checkpoints with
> > RETAIN_ON_CANCELLATION policy, one could cancel the job before the
> upgrade
> > and use the retained checkpoint to restart the job from it once the Flink
> > cluster is upgraded... or maybe I'm missing something ?
> >
> > I performed some tests and we are able to upgrade using checkpoint, by
> > passing the checkpoint path in the "flink run -s" parameter.
> >
> > Could you help to clarify if this is advised (and supported) or we should
> > stick to the use of savepoints for this kind of manipulations ?
> >
> >
> > Thanks in advance for your help.
> >
> > Edward
> >
> >
> >
> > --
> > Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
>

-- 
*Edward Alexander Rojas Clavijo*



*Software EngineerHybrid CloudIBM France*


Re: Flink Streaming Job Task is getting cancelled silently and causing job to restart

2019-01-09 Thread sohimankotia
Hi Stefan, 

Attaching Logs :

You can search for :  "2019-01-09 19:34:44,170 INFO 
org.apache.flink.runtime.taskmanager.Task - Attempting
to cancel task Source:
"  in first 2 log files.

f3-part-aa.gz

  
f3-part-ab.gz

  
f3-part-ac.gz

  
f3-part-ad.gz

  
f3-part-ae.gz

  
f3-part-af.gz

  



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: [DISCUSS] Dropping flink-storm?

2019-01-09 Thread Till Rohrmann
With https://issues.apache.org/jira/browse/FLINK-10571, we will remove the
Storm topologies from Flink and keep the wrappers for the moment.

However, looking at the FlinkTopologyContext [1], it becomes quite obvious
that Flink's compatibility with Storm is really limited. Almost all of the
context methods are not supported which makes me wonder how useful these
wrappers really are. Given the additional maintenance overhead of having
them in the code base and no indication that someone is actively using
them, I would still be in favour of removing them. This will reduce our
maintenance burden in the future. What do you think?

[1]
https://github.com/apache/flink/blob/master/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java

Cheers,
Till

On Tue, Oct 9, 2018 at 10:08 AM Fabian Hueske  wrote:

> Yes, let's do it this way.
> The wrapper classes are probably not too complex and can be easily tested.
> We have the same for the Hadoop interfaces, although I think only the
> Input- and OutputFormatWrappers are actually used.
>
>
> Am Di., 9. Okt. 2018 um 09:46 Uhr schrieb Chesnay Schepler <
> ches...@apache.org>:
>
>> That sounds very good to me.
>>
>> On 08.10.2018 11:36, Till Rohrmann wrote:
>> > Good point. The initial idea of this thread was to remove the storm
>> > compatibility layer completely.
>> >
>> > During the discussion I realized that it might be useful for our users
>> > to not completely remove it in one go. Instead for those who still
>> > want to use some Bolt and Spout code in Flink, it could be nice to
>> > keep the wrappers. At least, we could remove flink-storm in a more
>> > graceful way by first removing the Topology and client parts and then
>> > the wrappers. What do you think?
>> >
>> > Cheers,
>> > Till
>> >
>> > On Mon, Oct 8, 2018 at 11:13 AM Chesnay Schepler > > > wrote:
>> >
>> > I don't believe that to be the consensus. For starters it is
>> > contradictory; we can't /drop /flink-storm yet still /keep //some
>> > parts/.
>> >
>> > From my understanding we drop flink-storm completely, and put a
>> > note in the docs that the bolt/spout wrappers of previous versions
>> > will continue to work.
>> >
>> > On 08.10.2018 11:04, Till Rohrmann wrote:
>> >> Thanks for opening the issue Chesnay. I think the overall
>> >> consensus is to drop flink-storm and only keep the Bolt and Spout
>> >> wrappers. Thanks for your feedback!
>> >>
>> >> Cheers,
>> >> Till
>> >>
>> >> On Mon, Oct 8, 2018 at 9:37 AM Chesnay Schepler
>> >> mailto:ches...@apache.org>> wrote:
>> >>
>> >> I've created
>> >> https://issues.apache.org/jira/browse/FLINK-10509 for
>> >> removing flink-storm.
>> >>
>> >> On 28.09.2018 15:22, Till Rohrmann wrote:
>> >> > Hi everyone,
>> >> >
>> >> > I would like to discuss how to proceed with Flink's storm
>> >> compatibility
>> >> > layer flink-strom.
>> >> >
>> >> > While working on removing Flink's legacy mode, I noticed
>> >> that some parts of
>> >> > flink-storm rely on the legacy Flink client. In fact, at
>> >> the moment
>> >> > flink-storm does not work together with Flink's new
>> distributed
>> >> > architecture.
>> >> >
>> >> > I'm also wondering how many people are actually using
>> >> Flink's Storm
>> >> > compatibility layer and whether it would be worth porting it.
>> >> >
>> >> > I see two options how to proceed:
>> >> >
>> >> > 1) Commit to maintain flink-storm and port it to Flink's
>> >> new architecture
>> >> > 2) Drop flink-storm
>> >> >
>> >> > I doubt that we can contribute it to Apache Bahir [1],
>> >> because once we
>> >> > remove the legacy mode, this module will no longer work
>> >> with all newer
>> >> > Flink versions.
>> >> >
>> >> > Therefore, I would like to hear your opinion on this and in
>> >> particular if
>> >> > you are using or planning to use flink-storm in the future.
>> >> >
>> >> > [1] https://github.com/apache/bahir-flink
>> >> >
>> >> > Cheers,
>> >> > Till
>> >> >
>> >>
>> >
>>
>>


Re: Flink Streaming Job Task is getting cancelled silently and causing job to restart

2019-01-09 Thread Stefan Richter
Hi,

Could you also provide the job master log?

Best,
Stefan

> On 9. Jan 2019, at 12:02, sohimankotia  wrote:
> 
> Hi,
> 
> I am running Flink Streaming Job with 1.5.5 version.
> 
> - Job is basically reading from Kafka , windowing on 2 minutes , and writing
> to hdfs using AvroBucketing Sink .
> - Job is running with parallelism 132
> - Checkpointing is enabled with interval of 1 minute.
> - Savepoint is enabled and getting triggered every 30 min .
> 
> 
> Few Modified Properties :
> 
> akka.ask.timeout: 15min
> akka.client.timeout: 900s
> akka.lookup.timeout: 60s
> akka.tcp.timeout : 900s
> 
> akka.watch.heartbeat.interval: 120s
> akka.watch.heartbeat.pause: 900s
> 
> Issues :
> 
> Job is getting restarted 3 to 4 time every day ( At random times). It simply
> says attempting to cancel task. No exception or logging . I tried to set 
> 
> log4j.logger.org.apache.flink.runtime.taskmanager.Task=DEBUG,file  
> 
> But nothing important is getting logged. 
> 
> Enabling DEBUGGING at Flink level is making Streaming Application to slow (
> so can not do that ).
> 
> Attaching Task logs .
> 
> task.gz
> 
>   
> 
> 
> Thanks
> Sohi
> 
> 
> 
> 
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: windowAll and AggregateFunction

2019-01-09 Thread Stefan Richter
Hi,

I think your expectation about windowAll is wrong, from the method 
documentation: “Note: This operation is inherently non-parallel since all 
elements have to pass through the same operator instance” and I also cannot 
think of a way in which the windowing API would support your use case without a 
shuffle. You could probably build the functionality by hand through, but I 
guess this is not quite what you want.

Best,
Stefan

> On 9. Jan 2019, at 13:43, CPC  wrote:
> 
> Hi all,
> 
> In our implementation,we are consuming from kafka and calculating distinct 
> with hyperloglog. We are using windowAll function with a custom 
> AggregateFunction but flink runtime shows a little bit unexpected behavior at 
> runtime. Our sources running with parallelism 4 and i expect add function to 
> run after source calculate partial results and at the end of the window i 
> expect it to send 4 hll object to single operator to merge there(merge 
> function). Instead, it sends all data to single instance and call add 
> function there. 
> 
> Is here any way to make flink behave like this? I mean calculate partial 
> results after consuming from kafka with paralelism of sources without 
> shuffling(so some part of the calculation can be calculated in parallel) and 
> merge those partial results with a merge function?
> 
> Thank you in advance...



Re: Can checkpoints be used to migrate jobs between Flink versions ?

2019-01-09 Thread Stefan Richter
Hi,

I would assume that this should currently work because the format of basic 
savepoints and checkpoints is the same right now. The restriction in the doc is 
probably there in case that the checkpoint format will diverge more in the 
future.

Best,
Stefan

> On 9. Jan 2019, at 13:12, Edward Rojas  wrote:
> 
> Hello,
> 
> For upgrading jobs between Flink versions I follow the guide in the doc
> here:
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/upgrading.html#upgrading-the-flink-framework-version
> 
> It states that we should always use savepoints for this procedure, I
> followed it and it works perfectly. 
> 
> I just would like to know if there is a reason why is not advised to use
> checkpoints for this procedure.
> 
> Say for example that the job has externalized checkpoints with
> RETAIN_ON_CANCELLATION policy, one could cancel the job before the upgrade
> and use the retained checkpoint to restart the job from it once the Flink
> cluster is upgraded... or maybe I'm missing something ?
> 
> I performed some tests and we are able to upgrade using checkpoint, by
> passing the checkpoint path in the "flink run -s" parameter.
> 
> Could you help to clarify if this is advised (and supported) or we should
> stick to the use of savepoints for this kind of manipulations ?
> 
> 
> Thanks in advance for your help.
> 
> Edward
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Zookeeper shared by Flink and Kafka

2019-01-09 Thread Stefan Richter
Hi,

That is more a ZK question than a Flink question, but I don’t think there is a 
problem.

Best,
Stefan

> On 9. Jan 2019, at 13:31, min@ubs.com wrote:
> 
> Hi,
>  
> I am new to Flink.
>  
> I have a question:
> Can a zookeeper cluster be shared by a flink cluster and a kafka cluster?
>  
> Regards,
>  
> Min
> 
> Check out our new brand campaign: www.ubs.com/together 
> 
> E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, 
> potential manipulation of contents and/or sender's address, incorrect 
> recipient (misdirection), viruses etc. Based on previous e-mail 
> correspondence with you and/or an agreement reached with you, UBS considers 
> itself authorized to contact you via e-mail. UBS assumes no responsibility 
> for any loss or damage resulting from the use of e-mails. 
> The recipient is aware of and accepts the inherent risks of using e-mails, in 
> particular the risk that the banking relationship and confidential 
> information relating thereto are disclosed to third parties.
> UBS reserves the right to retain and monitor all messages. Messages are 
> protected and accessed only in legally justified cases.
> For information on how UBS uses and discloses personal data, how long we 
> retain it, how we keep it secure and your data protection rights, please see 
> our Privacy Notice http://www.ubs.com/global/en/legalinfo2/privacy.html 
> 


windowAll and AggregateFunction

2019-01-09 Thread CPC
Hi all,

In our implementation,we are consuming from kafka and calculating distinct
with hyperloglog. We are using windowAll function with a custom
AggregateFunction but flink runtime shows a little bit unexpected behavior
at runtime. Our sources running with parallelism 4 and i expect add
function to run after source calculate partial results and at the end of
the window i expect it to send 4 hll object to single operator to merge
there(merge function). Instead, it sends all data to single instance and
call add function there.

Is here any way to make flink behave like this? I mean calculate partial
results after consuming from kafka with paralelism of sources without
shuffling(so some part of the calculation can be calculated in parallel)
and merge those partial results with a merge function?

Thank you in advance...


Zookeeper shared by Flink and Kafka

2019-01-09 Thread min.tan
Hi,

I am new to Flink.

I have a question:
Can a zookeeper cluster be shared by a flink cluster and a kafka cluster?

Regards,

Min

Check out our new brand campaign: www.ubs.com/together
E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, potential 
manipulation of contents and/or sender's address, incorrect recipient 
(misdirection), viruses etc. Based on previous e-mail correspondence with you 
and/or an agreement reached with you, UBS considers itself authorized to 
contact you via e-mail. UBS assumes no responsibility for any loss or damage 
resulting from the use of e-mails. 
The recipient is aware of and accepts the inherent risks of using e-mails, in 
particular the risk that the banking relationship and confidential information 
relating thereto are disclosed to third parties.
UBS reserves the right to retain and monitor all messages. Messages are 
protected and accessed only in legally justified cases.
For information on how UBS uses and discloses personal data, how long we retain 
it, how we keep it secure and your data protection rights, please see our 
Privacy Notice http://www.ubs.com/global/en/legalinfo2/privacy.html

Can checkpoints be used to migrate jobs between Flink versions ?

2019-01-09 Thread Edward Rojas
Hello,

For upgrading jobs between Flink versions I follow the guide in the doc
here:
https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/upgrading.html#upgrading-the-flink-framework-version

It states that we should always use savepoints for this procedure, I
followed it and it works perfectly. 

I just would like to know if there is a reason why is not advised to use
checkpoints for this procedure.

Say for example that the job has externalized checkpoints with
RETAIN_ON_CANCELLATION policy, one could cancel the job before the upgrade
and use the retained checkpoint to restart the job from it once the Flink
cluster is upgraded... or maybe I'm missing something ?

I performed some tests and we are able to upgrade using checkpoint, by
passing the checkpoint path in the "flink run -s" parameter.

Could you help to clarify if this is advised (and supported) or we should
stick to the use of savepoints for this kind of manipulations ?


Thanks in advance for your help.

Edward



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Flink Streaming Job Task is getting cancelled silently and causing job to restart

2019-01-09 Thread sohimankotia
Hi,

I am running Flink Streaming Job with 1.5.5 version.

- Job is basically reading from Kafka , windowing on 2 minutes , and writing
to hdfs using AvroBucketing Sink .
- Job is running with parallelism 132
- Checkpointing is enabled with interval of 1 minute.
- Savepoint is enabled and getting triggered every 30 min .


Few Modified Properties :

akka.ask.timeout: 15min
akka.client.timeout: 900s
akka.lookup.timeout: 60s
akka.tcp.timeout : 900s

akka.watch.heartbeat.interval: 120s
akka.watch.heartbeat.pause: 900s

Issues :

Job is getting restarted 3 to 4 time every day ( At random times). It simply
says attempting to cancel task. No exception or logging . I tried to set 

log4j.logger.org.apache.flink.runtime.taskmanager.Task=DEBUG,file  

But nothing important is getting logged. 

Enabling DEBUGGING at Flink level is making Streaming Application to slow (
so can not do that ).

Attaching Task logs .

task.gz

  


Thanks
Sohi







--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: ConnectTimeoutException when createPartitionRequestClient

2019-01-09 Thread Wenrui Meng
Hi Till,

This job is not on AthenaX but on a special uber version Flink. I tried to
ping the connected host from connecting host. It seems very stable. For the
connection timeout, I do set it as 20min but it still report the timeout
after 2 minutes. Could you let me know how do you test locally about the
timeout setting?

Thanks,
Wenrui

On Tue, Jan 8, 2019 at 7:06 AM Till Rohrmann  wrote:

> Hi Wenrui,
>
> the exception now occurs while finishing the connection creation. I'm not
> sure whether this is so different. Could it be that your network is
> overloaded or not very reliable? Have you tried running your Flink job
> outside of AthenaX?
>
> Cheers,
> Till
>
> On Tue, Jan 8, 2019 at 2:50 PM Wenrui Meng  wrote:
>
>> Hi Till,
>>
>> Thanks for your reply. Our cluster is Yarn cluster. I found that if we
>> decrease the total parallel the timeout issue can be avoided. But we do
>> need that amount of taskManagers to process data. In addition, once I
>> increase the netty server threads to 128, the error is changed to to
>> following error. It seems the cause is different. Could you help take a
>> look?
>>
>> 2b0ac47c1eb1bcbbbe4a97) switched from RUNNING to FAILED.
>> java.io.IOException: Connecting the channel failed: Connecting to remote
>> task manager + 'athena464-sjc1/10.70.129.13:39466' has failed. This
>> might indicate that the remote task manager has been lost.
>> at
>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:197)
>> at
>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:132)
>> at
>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:84)
>> at
>> org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:59)
>> at
>> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:156)
>> at
>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:480)
>> at
>> org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.requestPartitions(UnionInputGate.java:134)
>> at
>> org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:148)
>> at
>> org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:93)
>> at
>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:214)
>> at
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by:
>> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
>> Connecting to remote task manager + 'athena464-sjc1/10.70.129.13:39466'
>> has failed. This might indicate that the remote task manager has been lost.
>> at
>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:220)
>> at
>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:132)
>> at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680)
>> at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:603)
>> at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:563)
>> at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:268)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:284)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>> at
>> 

Re: ConnectTimeoutException when createPartitionRequestClient

2019-01-09 Thread Till Rohrmann
Hi Wenrui,

I executed AutoParallelismITCase#testProgramWithAutoParallelism and set a
breakpoint in NettClient.java:102 to see whether the configured timeout
value is correctly set. Moreover, I did the same for
AbstractNioChannel.java:207 and it looked as if the correct timeout value
was set.

What is the special uber Flink version? What patches does it include? Are
you able to run your tests with the latest vanilla Flink version?

Cheers,
Till

On Wed, Jan 9, 2019 at 10:40 AM Wenrui Meng  wrote:

> Hi Till,
>
> This job is not on AthenaX but on a special uber version Flink. I tried to
> ping the connected host from connecting host. It seems very stable. For the
> connection timeout, I do set it as 20min but it still report the timeout
> after 2 minutes. Could you let me know how do you test locally about the
> timeout setting?
>
> Thanks,
> Wenrui
>
> On Tue, Jan 8, 2019 at 7:06 AM Till Rohrmann  wrote:
>
>> Hi Wenrui,
>>
>> the exception now occurs while finishing the connection creation. I'm not
>> sure whether this is so different. Could it be that your network is
>> overloaded or not very reliable? Have you tried running your Flink job
>> outside of AthenaX?
>>
>> Cheers,
>> Till
>>
>> On Tue, Jan 8, 2019 at 2:50 PM Wenrui Meng  wrote:
>>
>>> Hi Till,
>>>
>>> Thanks for your reply. Our cluster is Yarn cluster. I found that if we
>>> decrease the total parallel the timeout issue can be avoided. But we do
>>> need that amount of taskManagers to process data. In addition, once I
>>> increase the netty server threads to 128, the error is changed to to
>>> following error. It seems the cause is different. Could you help take a
>>> look?
>>>
>>> 2b0ac47c1eb1bcbbbe4a97) switched from RUNNING to FAILED.
>>> java.io.IOException: Connecting the channel failed: Connecting to remote
>>> task manager + 'athena464-sjc1/10.70.129.13:39466' has failed. This
>>> might indicate that the remote task manager has been lost.
>>> at
>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:197)
>>> at
>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:132)
>>> at
>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:84)
>>> at
>>> org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:59)
>>> at
>>> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:156)
>>> at
>>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:480)
>>> at
>>> org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.requestPartitions(UnionInputGate.java:134)
>>> at
>>> org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:148)
>>> at
>>> org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:93)
>>> at
>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:214)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Caused by:
>>> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
>>> Connecting to remote task manager + 'athena464-sjc1/10.70.129.13:39466'
>>> has failed. This might indicate that the remote task manager has been lost.
>>> at
>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:220)
>>> at
>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:132)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:603)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:563)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:268)
>>> at
>>>