Re: Streaming User-defined Aggregate Function gives exception when using Table API jars

2017-11-15 Thread Colin Williams
Thank you Fabian for fixing that. The highlight of my day today.


On Nov 15, 2017 1:01 AM, "Fabian Hueske"  wrote:

> Hi Colin,
>
> thanks for reporting the bug. I had a look at it and it seems that the
> wrong classloader is used when compiling the code (both for the batch as
> well as the streaming queries).
> I have a fix that I need to verify.
>
> It's not necessary to open a new JIRA for that. We can cover all cases
> under FLINK-7490.
>
> Thanks, Fabian
>
> 2017-11-15 5:32 GMT+01:00 Colin Williams  >:
>
>> From the documentation there is a note which instructs not to include the
>> flink-table dependency into the project. However when I put the flink-table
>> dependency on the cluster the User-defined Aggregate Function gives an
>> Exception.
>>
>> When I do include the flink-table into the dependencies, the project runs
>> just fine. However I'd expect that there will then be garbage collection
>> issues.
>>
>> This seems similar to https://issues.apache.org/jira/browse/FLINK-7490,
>> where I made a comment. I believe the issue is likely related to the
>> classloading as suggested, but the related classes are different (Batch vs
>> Stream).
>>
>> Should another bug report be filed?
>>
>> Also that bug report hasn't really had any activity and it's been a few
>> months.
>>
>> Best Regards,
>>
>> Colin Williams
>>
>>
>> java.io.IOException: Exception while applying AggregateFunction in
>> aggregating state
>> at org.apache.flink.runtime.state.heap.HeapAggregatingState.
>> add(HeapAggregatingState.java:91)
>> at org.apache.flink.streaming.runtime.operators.windowing.Windo
>> wOperator.processElement(WindowOperator.java:442)
>> at org.apache.flink.streaming.runtime.io.StreamInputProcessor.p
>> rocessInput(StreamInputProcessor.java:206)
>> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.
>> run(OneInputStreamTask.java:69)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>> StreamTask.java:263)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: org.apache.flink.api.common.InvalidProgramException: Table
>> program cannot be compiled. This is a bug. Please file an issue.
>> at org.apache.flink.table.codegen.Compiler$class.compile(
>> Compiler.scala:36)
>> at org.apache.flink.table.runtime.aggregate.AggregateAggFunctio
>> n.compile(AggregateAggFunction.scala:33)
>> at org.apache.flink.table.runtime.aggregate.AggregateAggFunctio
>> n.initFunction(AggregateAggFunction.scala:72)
>> at org.apache.flink.table.runtime.aggregate.AggregateAggFunctio
>> n.createAccumulator(AggregateAggFunction.scala:41)
>> at org.apache.flink.table.runtime.aggregate.AggregateAggFunctio
>> n.createAccumulator(AggregateAggFunction.scala:33)
>> at org.apache.flink.runtime.state.heap.HeapAggregatingState$Agg
>> regateTransformation.apply(HeapAggregatingState.java:115)
>> at org.apache.flink.runtime.state.heap.NestedMapsStateTable.tra
>> nsform(NestedMapsStateTable.java:298)
>> at org.apache.flink.runtime.state.heap.HeapAggregatingState.
>> add(HeapAggregatingState.java:89)
>> ... 6 more
>> Caused by: org.codehaus.commons.compiler.CompileException: Line 6,
>> Column 14: Cannot determine simple type name "com"
>> at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11672)
>> at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompil
>> er.java:6416)
>> at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompil
>> er.java:6177)
>> at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompil
>> er.java:6190)
>> at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompil
>> er.java:6190)
>> at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompil
>> er.java:6190)
>> at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6156)
>> at org.codehaus.janino.UnitCompiler.access$13300(UnitCompiler.java:212)
>> at org.codehaus.janino.UnitCompiler$18$1.visitReferenceType(Uni
>> tCompiler.java:6064)
>> at org.codehaus.janino.UnitCompiler$18$1.visitReferenceType(Uni
>> tCompiler.java:6059)
>> at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3754)
>> at org.codehaus.janino.UnitCompiler$18.visitType(UnitCompiler.java:6059)
>> at org.codehaus.janino.UnitCompiler$18.visitType(UnitCompiler.java:6052)
>> at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3753)
>> at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6052)
>> at org.codehaus.janino.UnitCompiler.access$1200(UnitCompiler.java:212)
>> at org.codehaus.janino.UnitCompiler$21.getType(UnitCompiler.java:7844)
>> at org.codehaus.janino.IClass$IField.getDescriptor(IClass.java:1299)
>> at org.codehaus.janino.UnitCompiler.getfield(UnitCompiler.java:11439)
>> at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:4118)
>> at org.codehaus.janino.UnitCompiler.access$6800(UnitCompiler.java:212)
>> at org.codehaus.janino.UnitCompiler$12$1.visitFieldAccess(
>> 

Re: kafka consumer client seems not auto commit offset

2017-11-15 Thread Ashish Pokharel
Gordon, Tony,

Thought I would chime in real quick as I have tested this a few different ways 
in the last month (not sure if this will be helpful but thought I’d throw it 
out there). I actually haven’t noticed issue auto committing with any of those 
configs using Kafka property auto.offset.reset instead of using those methods. 
However, I have come across one interesting scenario - even when Checkpointing 
is disabled BUT if App is started from a Savepoint, auto commit doesn’t seem to 
work. I am not sure if Tony has the same scenario. I assumed that starting from 
Savepoint sort of expects Checkpointing to be enabled to commit offsets similar 
to how it behaves when Checkpointing is enabled. At this point, I am generating 
a  random UID for my Kafka consumer (as I really don’t want to enable 
checkpointing — not really needed in my use case and want to save on some 
resources) but I do have some really slow moving states which I’d like save on 
app shutdown etc.

Thanks, Ashish

> On Nov 15, 2017, at 4:22 AM, Tzu-Li (Gordon) Tai  wrote:
> 
> Hi Tony,
> 
> Thanks for the report. At first glance of the description, what you described 
> doesn’t seem to match the expected behavior.
> I’ll spend some time later today to check this out.
> 
> Cheers,
> Gordon
> 
> 
> On 15 November 2017 at 5:08:34 PM, Tony Wei (tony19920...@gmail.com 
> ) wrote:
> 
>> Hi Gordon,
>> 
>> When I used FlinkKafkaConsumer010 to consume log from Kafka, I found that if 
>> I used `setStartFromLatest()` the kafka consumer api didn't auto commit 
>> offsets back to consumer group, but if I used `setStartFromGroupOffsets()` 
>> it worked fine.
>> 
>> I am sure that the configuration for Kafka has `auto.commit.interval.ms 
>>  = 5000` and `enable.auto.commit = true` 
>> and I didn't enable checkpointing.
>> 
>> All the difference is only the change from `setStartFromGroupOffsets()` to 
>> `setStartFromLatest()`, but the auto commit mechanism just stopped working.
>> 
>> My Flink cluster version is 1.3.2.
>> My Kafka cluster version is 0.10.2.1.
>> My Zookeeper version for Kafka is 3.4.6-1569965, built on 02/20/2014 09:09 
>> GMT.
>> My Kafka connector library is "org.apache.flink" % 
>> "flink-connector-kafka-0.10_2.10" % "1.3.2"
>> 
>> Thanks for your help in advance.
>> 
>> Best Regards,
>> Tony Wei



ElasticSearch 6

2017-11-15 Thread Fritz Budiyanto
Hi All,

ES6 is GA today, and I wonder if Flink-ES5 connector fully support ES6 ? Any 
caveat we need to know ?

Thanks,
Fritz

[Flink] How to Converting DataStream to Dataset or Table?

2017-11-15 Thread Richard Xin
I have DataStream, is there a way to convert it DataSet or table so that I 
could sort it and persist it a file?
Thanks a lot!

Re: Apache Flink - Question about TriggerResult.FIRE

2017-11-15 Thread M Singh
Hi Guys
Is there any insight into this ?
Thanks
Mans 

On Monday, November 13, 2017 11:19 AM, M Singh  wrote:
 

 Hi Flink Users
I have a few questions about triggers:
If a trigger returns TriggerResult.FIRE from say the onProcessingTime method - 
the window computation is triggered but elements are kept in the window.  If 
there a second invocation of the onProcessingTime method will the elements from 
the previous window (which were not purged) a part of the new window 
computation along with new events added since the last FIRE event ? 
Secondly, how does the FIRE option affect the sliding window computation ?
If there are any other insights/pitfalls while dealing with this, please let me 
know.
Thanks
Mans



   

Re: Apache Flink - Question about Global Windows

2017-11-15 Thread M Singh
Hi Xingcan:  Thanks for your response.
So to summarize - global windows can be applied to keyed and non keyed windows 
- we only have to specify trigger with it to invoke the computation function.
Thanks
Mans 

On Wednesday, November 15, 2017 5:43 AM, Xingcan Cui  
wrote:
 

 Hi Mans,
the "global" here indicates the "horizontal" (count, time, etc.) dimension 
instead of the "vertical" (keyBy) dimension, i.e., all the received data will 
be placed into a single huge window. Actually, it's an orthogonal concept with 
the KeyBy operations since both DataStream and KeyedStream can define their own 
global windows. Compared with other windows (e.g., tumbling or sliding ones), 
it's more flexible to implement your own triggers on it. 
Hope that helps.
Best,Xingcan
On Wed, Nov 15, 2017 at 2:12 AM, M Singh  wrote:

Hi:
I am reading about global windows and the documentation indicates:
'A global windows assigner assigns all elements with the same key to the same 
single global window'

>From my understanding if we have a keyed stream - then all elements with the 
>same key are also assigned to a single window.  I understand that global 
>windows never trigger window computation.  But is there any other the 
>difference between the two windows (global vs non-global) ?  
Thanks
Mans





   

Re: R/W traffic estimation between Flink and Zookeeper

2017-11-15 Thread Hao Sun
Thanks Piotr, does Flink read/write to zookeeper every time it process a
record?
I thought only JM uses ZK to keep some meta level data, not sure why `it
depends on many things like state backend used, state size, complexity of
your application, size of the records, number of machines, their hardware
and the network.`

On Thu, Oct 12, 2017 at 1:35 AM Piotr Nowojski 
wrote:

> Hi,
>
> Are you asking how to measure records/s or is it possible to achieve it?
> To measure it you can check numRecordsInPerSecond metric.
>
> As far if 1000 records/s is possible, it depends on many things like state
> backend used, state size, complexity of your application, size of the
> records, number of machines, their hardware and the network. In the very
> simplest cases it is possible to achieve millions of records per second per
> machine. It would be best to try it out in your particular use case on some
> small scale.
>
> Piotrek
>
> > On 11 Oct 2017, at 19:58, Hao Sun  wrote:
> >
> > Hi Is there a way to estimate read/write traffic between flink and zk?
> > I am looking for something like 1000 reads/sec or 1000 writes/sec. And
> the size of the message.
> >
> > Thanks
>
>


external checkpoints

2017-11-15 Thread Aviad Rotem
Hi,
I have several jobs which configured for external check-pointing
(enableExternalizedCheckpoints)
how can I correlate between checkpoint and jobs.
for example, if i want to write script which monitor if the job is up or
not and if the job is down it will resume the job from the externalized
checkpoint.
how could i know which checkpoint belong to the specific job?

can I configure each job to write the external check-pointing to a
different position?

my configuration is:
*state.backend*: rocksdb
*state.backend.fs.checkpointdir*: s3a://flink-bucket/backend/checkpoints
*state.checkpoints.dir*: s3a://flink-bucket/checkpoints

and in the code I set:
enableCheckpointing
enableExternalizedCheckpoints


Re: Apache Flink - Question about Global Windows

2017-11-15 Thread Xingcan Cui
Hi Mans,

the "global" here indicates the "horizontal" (count, time, etc.) dimension
instead of the "vertical" (keyBy) dimension, i.e., all the received data
will be placed into a single huge window. Actually, it's an orthogonal
concept with the *KeyBy* operations since both *DataStream* and
*KeyedStream* can define their own global windows. Compared with other
windows (e.g., tumbling or sliding ones), it's more flexible to implement
your own triggers on it.

Hope that helps.

Best,
Xingcan

On Wed, Nov 15, 2017 at 2:12 AM, M Singh  wrote:

> Hi:
>
> I am reading about global windows and the documentation indicates:
>
> 'A *global windows* assigner assigns all elements with the same key to
> the same single *global window'*
>
> From my understanding if we have a keyed stream - then all elements with
> the same key are also assigned to a single window.  I understand that
> global windows never trigger window computation.  But is there any other
> the difference between the two windows (global vs non-global) ?
>
> Thanks
>
> Mans
>
>
>


RestartStrategies & checkpoints

2017-11-15 Thread Aviad Rotem
Hi,

if my job configured with *setRestartStrategy *and with *enableCheckpointing
*().
it case of failure and job restart, does it restart with the latest
checkpoint or is it restarts without any checkpoint.


Re: Off heap memory issue

2017-11-15 Thread Piotr Nowojski
Hi,

I have been able to observe some off heap memory “issues” by submitting Kafka 
job provided by Javier Lopez (in different mailing thread). 

TL;DR;

There was no memory leak, just memory pool “Metaspace” and “Compressed Class 
Space” are growing in size over time and are only rarely garbage collected. In 
my test case they together were wasting up to ~7GB of memory, while my test 
case could use as little as ~100MB. Connect with for example jconsole to your 
JVM, check their size and cut their size by half by setting:

env.java.opts: -XX:CompressedClassSpaceSize=***M -XX:MaxMetaspaceSize=***M

In flink-conf.yaml. Everything works fine and memory consumption still too 
high? Rinse and repeat.


Long story:

In default settings, with max heap size of 1GB, off heap memory consumption, 
memory consumption off non-heap memory pools of “Metaspace” and “Compressed 
Class Space” was growing in time which seemed like indefinitely, and Metaspace 
was always around ~6 times larger compared to compressed class space. Default 
max meatspace size is unlimited, while “Compressed class space” has a default 
max size of 1GB. 

When I decreased the CompressedClassSpaceSize down to 100MB, memory consumption 
grew up to 90MB and then it started bouncing up and down by couple of MB. 
“Metaspace” was following the same pattern, but using ~600MB. When I decreased 
down MaxMetaspaceSize to 200MB, memory consumption of both pools was bouncing 
around ~220MB.

It seems like there are no general guide lines how to configure those values, 
since it’s heavily application dependent. However this seems like the most 
likely suspect of the apparent OFF HEAP “memory leak” that was reported couple 
of times in use cases where users are submitting hundreds/thousands of jobs to 
Flink cluster. For more information please check here:

https://docs.oracle.com/javase/8/docs/technotes/guides/vm/gctuning/considerations.html
 


Please let us know if this solves your issues.

Thanks, Piotrek

> On 13 Nov 2017, at 16:06, Flavio Pompermaier  wrote:
> 
> Unfortunately the issue I've opened [1] was not a problem of Flink but was 
> just caused by an ever increasing job plan.
> So no help from that..Let's hope to find out the real source of the problem.
> Maybe using  -Djdk.nio.maxCachedBufferSize could help (but I didn't try it 
> yet)
> 
> Best,
> Flavio
> 
> [1] https://issues.apache.org/jira/browse/FLINK-7845 
> 
> 
> On Wed, Oct 18, 2017 at 2:07 PM, Kien Truong  > wrote:
> Hi,
> 
> We saw a similar issue in one of our job due to ByteBuffer memory leak[1]. 
> We fixed it using the solution in the article, setting 
> -Djdk.nio.maxCachedBufferSize
> 
> This variable is available for Java > 8u102
> 
> Best regards,
> 
> Kien
> [1]http://www.evanjones.ca/java-bytebuffer-leak.html 
> 
> 
> On 10/18/2017 4:06 PM, Flavio Pompermaier wrote:
>> We also faced the same problem, but the number of jobs we can run before 
>> restarting the cluster depends on the volume of the data to shuffle around 
>> the network. We even had problems with a single job and in order to avoid 
>> OOM issues we had to put some configuration to limit Netty memory usage, 
>> i.e.:
>>  - Add to flink.yaml -> env.java.opts: 
>> -Dio.netty.recycler.maxCapacity.default=1
>>  - Edit taskmanager.sh and change TM_MAX_OFFHEAP_SIZE from 8388607T to 5g
>> 
>> At this purpose we wrote a small test to reproduce the problem and we opened 
>> an issue for that [1].
>> We still don't know if the problems are related however..
>> 
>> I hope that could be helpful,
>> Flavio
>> 
>> [1] https://issues.apache.org/jira/browse/FLINK-7845 
>> 
>> 
>> On Wed, Oct 18, 2017 at 10:48 AM, Javier Lopez > > wrote:
>> Hi Robert,
>> 
>> Sorry to reply this late. We did a lot of tests, trying to identify if the 
>> problem was in our custom sources/sinks. We figured out that none of our 
>> custom components is causing this problem. We came up with a small test, and 
>> realized that the Flink nodes run out of non-heap JVM memory and crash after 
>> deployment of thousands of jobs. 
>> 
>> When rapidly deploying thousands or hundreds of thousands of Flink jobs - 
>> depending on job complexity in terms of resource consumption - Flink nodes 
>> non-heap JVM memory consumption grows until there is no more memory left on 
>> the machine and the Flink process crashes. Both TaskManagers and JobManager 
>> exhibit the same behavior. The TaskManagers die faster though. The memory 
>> consumption doesn't decrease after stopping the deployment of new jobs, with 
>> the cluster being idle (no running jobs). 
>> 
>> We could replicate the behavior 

Re: Model serving in Flink DataStream

2017-11-15 Thread Andrea Spina
Hi Adarsh,
we developed flink-JPMML for streaming model serving based on top of the
PMML format and of course Flink: we didn't release any official benchmark
numbers yet. We didn't bump into any performance issue along the library
employment. In terms of throughput and latency it doesn't require more
effort than using Flink streaming APIs by itself.

What it can happen is high memory usage if you're deploying thousands of
(fatty) models at a time within the same pipeline, but this was a design
choice (you can see explanation here
https://www.youtube.com/watch?v=0rWvMZ6JSD8=17s). 

AFAIK the lib is already deployed in a couple of projects. Don't hesitate to
write on Github issues if you have more questions.

https://github.com/FlinkML/flink-jpmml

Cheers,

Andrea



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Issue with back pressure and AsyncFunction

2017-11-15 Thread Aljoscha Krettek
Hi,

Unfortunately, I don't have anything to add. Yes, back pressure doesn't work 
correctly for functions that do work outside the main thread and iterations 
currently don't work well and can lead to deadlocks.

Did you already open issues for those by now?

Best,
Aljoscha

> On 10. Nov 2017, at 22:46, Ufuk Celebi  wrote:
> 
> Hey Ken,
> 
> thanks for your message. Both your comments are correct (see inline).
> 
> On Fri, Nov 10, 2017 at 10:31 PM, Ken Krugler
>  wrote:
>> 1. A downstream function in the iteration was (significantly) increasing the
>> number of tuples - it would get one in, and sometimes emit 100+.
>> 
>> The output would loop back as input via the iteration.
>> 
>> This eventually caused the network buffers to fill up, and that’s why the
>> job got stuck.
>> 
>> I had to add my own tracking/throttling in one of my custom function, to
>> avoid having too many “active” tuples.
>> 
>> So maybe something to note in documentation on iterations, if it’s not there
>> already.
> 
> Yes, iterations are prone to deadlock due to the way that data is
> exchanged between the sink and head nodes. There have been multiple
> attempts to fix these shortcomings, but I don't know what the latest
> state is. Maybe Aljoscha (CC'd) has some input...
> 
>> 2. The back pressure calculation doesn’t take into account AsyncIO
> 
> Correct, the back pressure monitoring only takes the main task thread
> into account. Every operator that uses a separate thread to emit
> records (like Async I/O oder Kafka source) is therefore not covered by
> the back pressure monitoring.
> 
> – Ufuk



Re: kafka consumer client seems not auto commit offset

2017-11-15 Thread Tzu-Li (Gordon) Tai
Hi Tony,

Thanks for the report. At first glance of the description, what you described 
doesn’t seem to match the expected behavior.
I’ll spend some time later today to check this out.

Cheers,
Gordon


On 15 November 2017 at 5:08:34 PM, Tony Wei (tony19920...@gmail.com) wrote:

Hi Gordon,

When I used FlinkKafkaConsumer010 to consume log from Kafka, I found that if I 
used `setStartFromLatest()` the kafka consumer api didn't auto commit offsets 
back to consumer group, but if I used `setStartFromGroupOffsets()` it worked 
fine.

I am sure that the configuration for Kafka has `auto.commit.interval.ms = 5000` 
and `enable.auto.commit = true` and I didn't enable checkpointing.

All the difference is only the change from `setStartFromGroupOffsets()` to 
`setStartFromLatest()`, but the auto commit mechanism just stopped working.

My Flink cluster version is 1.3.2.
My Kafka cluster version is 0.10.2.1.
My Zookeeper version for Kafka is 3.4.6-1569965, built on 02/20/2014 09:09 GMT.
My Kafka connector library is "org.apache.flink" % 
"flink-connector-kafka-0.10_2.10" % "1.3.2"

Thanks for your help in advance.

Best Regards,
Tony Wei

kafka consumer client seems not auto commit offset

2017-11-15 Thread Tony Wei
Hi Gordon,

When I used FlinkKafkaConsumer010 to consume log from Kafka, I found that
if I used `setStartFromLatest()` the kafka consumer api didn't auto commit
offsets back to consumer group, but if I used `setStartFromGroupOffsets()`
it worked fine.

I am sure that the configuration for Kafka has `auto.commit.interval.ms =
5000` and `enable.auto.commit = true` and I didn't enable checkpointing.

All the difference is only the change from `setStartFromGroupOffsets()` to
`setStartFromLatest()`, but the auto commit mechanism just stopped working.

My Flink cluster version is 1.3.2.
My Kafka cluster version is 0.10.2.1.
My Zookeeper version for Kafka is 3.4.6-1569965, built on 02/20/2014 09:09
GMT.
My Kafka connector library is "org.apache.flink" %
"flink-connector-kafka-0.10_2.10" % "1.3.2"

Thanks for your help in advance.

Best Regards,
Tony Wei


Re: Streaming User-defined Aggregate Function gives exception when using Table API jars

2017-11-15 Thread Fabian Hueske
Hi Colin,

thanks for reporting the bug. I had a look at it and it seems that the
wrong classloader is used when compiling the code (both for the batch as
well as the streaming queries).
I have a fix that I need to verify.

It's not necessary to open a new JIRA for that. We can cover all cases
under FLINK-7490.

Thanks, Fabian

2017-11-15 5:32 GMT+01:00 Colin Williams :

> From the documentation there is a note which instructs not to include the
> flink-table dependency into the project. However when I put the flink-table
> dependency on the cluster the User-defined Aggregate Function gives an
> Exception.
>
> When I do include the flink-table into the dependencies, the project runs
> just fine. However I'd expect that there will then be garbage collection
> issues.
>
> This seems similar to https://issues.apache.org/jira/browse/FLINK-7490,
> where I made a comment. I believe the issue is likely related to the
> classloading as suggested, but the related classes are different (Batch vs
> Stream).
>
> Should another bug report be filed?
>
> Also that bug report hasn't really had any activity and it's been a few
> months.
>
> Best Regards,
>
> Colin Williams
>
>
> java.io.IOException: Exception while applying AggregateFunction in
> aggregating state
> at org.apache.flink.runtime.state.heap.HeapAggregatingState.add(
> HeapAggregatingState.java:91)
> at org.apache.flink.streaming.runtime.operators.windowing.
> WindowOperator.processElement(WindowOperator.java:442)
> at org.apache.flink.streaming.runtime.io.StreamInputProcessor.
> processInput(StreamInputProcessor.java:206)
> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(
> OneInputStreamTask.java:69)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:263)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.api.common.InvalidProgramException: Table
> program cannot be compiled. This is a bug. Please file an issue.
> at org.apache.flink.table.codegen.Compiler$class.
> compile(Compiler.scala:36)
> at org.apache.flink.table.runtime.aggregate.AggregateAggFunction.compile(
> AggregateAggFunction.scala:33)
> at org.apache.flink.table.runtime.aggregate.AggregateAggFunction.
> initFunction(AggregateAggFunction.scala:72)
> at org.apache.flink.table.runtime.aggregate.AggregateAggFunction.
> createAccumulator(AggregateAggFunction.scala:41)
> at org.apache.flink.table.runtime.aggregate.AggregateAggFunction.
> createAccumulator(AggregateAggFunction.scala:33)
> at org.apache.flink.runtime.state.heap.HeapAggregatingState$
> AggregateTransformation.apply(HeapAggregatingState.java:115)
> at org.apache.flink.runtime.state.heap.NestedMapsStateTable.transform(
> NestedMapsStateTable.java:298)
> at org.apache.flink.runtime.state.heap.HeapAggregatingState.add(
> HeapAggregatingState.java:89)
> ... 6 more
> Caused by: org.codehaus.commons.compiler.CompileException: Line 6, Column
> 14: Cannot determine simple type name "com"
> at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11672)
> at org.codehaus.janino.UnitCompiler.getReferenceType(
> UnitCompiler.java:6416)
> at org.codehaus.janino.UnitCompiler.getReferenceType(
> UnitCompiler.java:6177)
> at org.codehaus.janino.UnitCompiler.getReferenceType(
> UnitCompiler.java:6190)
> at org.codehaus.janino.UnitCompiler.getReferenceType(
> UnitCompiler.java:6190)
> at org.codehaus.janino.UnitCompiler.getReferenceType(
> UnitCompiler.java:6190)
> at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6156)
> at org.codehaus.janino.UnitCompiler.access$13300(UnitCompiler.java:212)
> at org.codehaus.janino.UnitCompiler$18$1.visitReferenceType(
> UnitCompiler.java:6064)
> at org.codehaus.janino.UnitCompiler$18$1.visitReferenceType(
> UnitCompiler.java:6059)
> at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3754)
> at org.codehaus.janino.UnitCompiler$18.visitType(UnitCompiler.java:6059)
> at org.codehaus.janino.UnitCompiler$18.visitType(UnitCompiler.java:6052)
> at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3753)
> at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6052)
> at org.codehaus.janino.UnitCompiler.access$1200(UnitCompiler.java:212)
> at org.codehaus.janino.UnitCompiler$21.getType(UnitCompiler.java:7844)
> at org.codehaus.janino.IClass$IField.getDescriptor(IClass.java:1299)
> at org.codehaus.janino.UnitCompiler.getfield(UnitCompiler.java:11439)
> at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:4118)
> at org.codehaus.janino.UnitCompiler.access$6800(UnitCompiler.java:212)
> at org.codehaus.janino.UnitCompiler$12$1.visitFieldAccess(UnitCompiler.
> java:4053)
> at org.codehaus.janino.UnitCompiler$12$1.visitFieldAccess(UnitCompiler.
> java:4048)
> at org.codehaus.janino.Java$FieldAccess.accept(Java.java:4136)
> at org.codehaus.janino.UnitCompiler$12.visitLvalue(UnitCompiler.java:4048)
> at 

Re: Some questions regarding flink security features

2017-11-15 Thread Divansh Arora
2. The data on wire as I have understood is encrypted by SSL. Is this
correct?

Thanks


On Wed, Nov 15, 2017 at 2:02 PM, Divansh Arora  wrote:

> Hi
> I am Divansh.
>
> I have a few questions regarding security features in flink as we need to
> use flink like software in our product. Sorry in advance if I ask anything
> stupid as I'm a newbie and nt
>
> Questions:
>
> 1. Is there any way to take encrypted data in flink from client along with
> required credentials and then decrypt that to perform transformations? If
> so, can I also output data in encrypted form?
>
> 2.
>


Some questions regarding flink security features

2017-11-15 Thread Divansh Arora
Hi
I am Divansh.

I have a few questions regarding security features in flink as we need to
use flink like software in our product. Sorry in advance if I ask anything
stupid as I'm a newbie and nt

Questions:

1. Is there any way to take encrypted data in flink from client along with
required credentials and then decrypt that to perform transformations? If
so, can I also output data in encrypted form?

2.