Flink 1.4.0 release commit

2018-06-11 Thread Abdul Qadeer
Hi!

I was trying to find out the commit from which 1.4.0 was released. The
release was on 29th November 2017 but I am not able to find any commits
around that date to verify this. Any help appreciated.


Re: DataStreamCalcRule grows beyond 64 KB

2018-06-11 Thread Hequn Cheng
Hi rakeshchalasani,

At the moment flink only splits methods by fields to avoid 64k problem, so
current implementation will reach the limits if a single field becomes too
large. Flink community has already planed to solve the problem, see [1]. As
a workaround, you can define you own udf to avoid the problem. The udf will
be serialized so it will not cause code bloat.

Best, Hequn

[1] https://issues.apache.org/jira/browse/FLINK-8920

On Tue, Jun 12, 2018 at 5:31 AM, rakeshchalasani 
wrote:

> Hi,
>
> We hit a situation where the code generation on Flink grows beyond 64KB and
> fails. Spark SQL has a similar issue and it automatically disables
> code-generation in such a case. Any way we can control that here? Following
> is the error stack:
>
> org.apache.flink.api.common.InvalidProgramException: Table program cannot
> be
> compiled. This is a bug. Please file an issue.
> at org.apache.flink.table.codegen.Compiler$class.
> compile(Compiler.scala:36)
> at
> org.apache.flink.table.runtime.CRowProcessRunner.
> compile(CRowProcessRunner.scala:35)
> at
> org.apache.flink.table.runtime.CRowProcessRunner.
> open(CRowProcessRunner.scala:49)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.
> openFunction(FunctionUtils.java:36)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(
> AbstractUdfStreamOperator.java:111)
> at
> org.apache.flink.streaming.api.operators.ProcessOperator.
> open(ProcessOperator.java:56)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.
> openAllOperators(StreamTask.java:376)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: org.codehaus.janino.InternalCompilerException: Compiling
> "DataStreamCalcRule$20583": Code of method
> "processElement(Ljava/lang/Object;Lorg/apache/flink/
> streaming/api/functions/ProcessFunction$Context;Lorg/
> apache/flink/util/Collector;)V"
> of class "DataStreamCalcRule$20583" grows beyond 64 KB
> at org.codehaus.janino.UnitCompiler.compileUnit(
> UnitCompiler.java:361)
> at org.codehaus.janino.SimpleCompiler.cook(
> SimpleCompiler.java:234)
> at
> org.codehaus.janino.SimpleCompiler.compileToClassLoader(
> SimpleCompiler.java:446)
> at org.codehaus.janino.SimpleCompiler.cook(
> SimpleCompiler.java:213)
> at org.codehaus.janino.SimpleCompiler.cook(
> SimpleCompiler.java:204)
> at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)
> at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75)
> at org.apache.flink.table.codegen.Compiler$class.
> compile(Compiler.scala:33)
> ... 9 common frames omitted
> Caused by: org.codehaus.janino.InternalCompilerException: Code of method
> "processElement(Ljava/lang/Object;Lorg/apache/flink/
> streaming/api/functions/ProcessFunction$Context;Lorg/
> apache/flink/util/Collector;)V"
> of class "DataStreamCalcRule$20583" grows beyond 64 KB
> at org.codehaus.janino.CodeContext.makeSpace(CodeContext.java:990)
> at org.codehaus.janino.CodeContext.write(CodeContext.java:867)
> at org.codehaus.janino.UnitCompiler.writeOpcode(
> UnitCompiler.java:11901)
> at org.codehaus.janino.UnitCompiler.store(UnitCompiler.java:11576)
> at org.codehaus.janino.UnitCompiler.store(UnitCompiler.java:11560)
> at org.codehaus.janino.UnitCompiler.compile2(
> UnitCompiler.java:2573)
> at org.codehaus.janino.UnitCompiler.access$2700(
> UnitCompiler.java:212)
> at
> org.codehaus.janino.UnitCompiler$6.visitLocalVariableDeclarationS
> tatement(UnitCompiler.java:1482)
> at
> org.codehaus.janino.UnitCompiler$6.visitLocalVariableDeclarationS
> tatement(UnitCompiler.java:1466)
> at
> org.codehaus.janino.Java$LocalVariableDeclarationStatem
> ent.accept(Java.java:3351)
> at org.codehaus.janino.UnitCompiler.compile(
> UnitCompiler.java:1466)
> at
> org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1546)
> at org.codehaus.janino.UnitCompiler.compile2(
> UnitCompiler.java:1532)
> at org.codehaus.janino.UnitCompiler.access$1700(
> UnitCompiler.java:212)
> at org.codehaus.janino.UnitCompiler$6.visitBlock(
> UnitCompiler.java:1472)
> at org.codehaus.janino.UnitCompiler$6.visitBlock(
> UnitCompiler.java:1466)
> at org.codehaus.janino.Java$Block.accept(Java.java:2756)
> at org.codehaus.janino.UnitCompiler.compile(
> UnitCompiler.java:1466)
> at org.codehaus.janino.UnitCompiler.compile2(
> UnitCompiler.java:2444)
> at org.codehaus.janino.UnitCompiler.access$1900(
> UnitCompiler.java:212)
> at
> org.codehaus.janino.UnitCompiler$6.visitIfStatement(UnitCompiler.
> java:1474)
> at
> 

How to submit two Flink jobs to the same Flink cluster?

2018-06-11 Thread Angelica
I have a Flink Standalone Cluster based on Flink 1.4.2 (1 job manager, 4 task
slots) and want to submit two different Flink programs. 
Not sure if this is possible at all as some flink archives say that a job
manager can only run one job. If this is true, any ideas how can I get
around this issue? There is only one machine available for the Flink cluster
and we don't want to use any resource manager such as Mesos or Yarn.

Any hints?






--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


DataStreamCalcRule grows beyond 64 KB

2018-06-11 Thread rakeshchalasani
Hi,

We hit a situation where the code generation on Flink grows beyond 64KB and
fails. Spark SQL has a similar issue and it automatically disables
code-generation in such a case. Any way we can control that here? Following
is the error stack:

org.apache.flink.api.common.InvalidProgramException: Table program cannot be
compiled. This is a bug. Please file an issue.
at 
org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36)
at
org.apache.flink.table.runtime.CRowProcessRunner.compile(CRowProcessRunner.scala:35)
at
org.apache.flink.table.runtime.CRowProcessRunner.open(CRowProcessRunner.scala:49)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:111)
at
org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.codehaus.janino.InternalCompilerException: Compiling
"DataStreamCalcRule$20583": Code of method
"processElement(Ljava/lang/Object;Lorg/apache/flink/streaming/api/functions/ProcessFunction$Context;Lorg/apache/flink/util/Collector;)V"
of class "DataStreamCalcRule$20583" grows beyond 64 KB
at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:361)
at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:234)
at
org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:446)
at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:213)
at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:204)
at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)
at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75)
at 
org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:33)
... 9 common frames omitted
Caused by: org.codehaus.janino.InternalCompilerException: Code of method
"processElement(Ljava/lang/Object;Lorg/apache/flink/streaming/api/functions/ProcessFunction$Context;Lorg/apache/flink/util/Collector;)V"
of class "DataStreamCalcRule$20583" grows beyond 64 KB
at org.codehaus.janino.CodeContext.makeSpace(CodeContext.java:990)
at org.codehaus.janino.CodeContext.write(CodeContext.java:867)
at org.codehaus.janino.UnitCompiler.writeOpcode(UnitCompiler.java:11901)
at org.codehaus.janino.UnitCompiler.store(UnitCompiler.java:11576)
at org.codehaus.janino.UnitCompiler.store(UnitCompiler.java:11560)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2573)
at org.codehaus.janino.UnitCompiler.access$2700(UnitCompiler.java:212)
at
org.codehaus.janino.UnitCompiler$6.visitLocalVariableDeclarationStatement(UnitCompiler.java:1482)
at
org.codehaus.janino.UnitCompiler$6.visitLocalVariableDeclarationStatement(UnitCompiler.java:1466)
at
org.codehaus.janino.Java$LocalVariableDeclarationStatement.accept(Java.java:3351)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1466)
at
org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1546)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:1532)
at org.codehaus.janino.UnitCompiler.access$1700(UnitCompiler.java:212)
at org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1472)
at org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1466)
at org.codehaus.janino.Java$Block.accept(Java.java:2756)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1466)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2444)
at org.codehaus.janino.UnitCompiler.access$1900(UnitCompiler.java:212)
at
org.codehaus.janino.UnitCompiler$6.visitIfStatement(UnitCompiler.java:1474)
at
org.codehaus.janino.UnitCompiler$6.visitIfStatement(UnitCompiler.java:1466)
at org.codehaus.janino.Java$IfStatement.accept(Java.java:2926)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1466)
at
org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1546)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:1532)
at org.codehaus.janino.UnitCompiler.access$1700(UnitCompiler.java:212)
at org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1472)
at org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1466)
at org.codehaus.janino.Java$Block.accept(Java.java:2756)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1466)
 

Re: Flink 1.6 release note!!

2018-06-11 Thread vipul singh
I think you are looking for this?
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/DISCUSS-Flink-1-6-features-tc20502.html

1.6 release notes as per current website:
https://ci.apache.org/projects/flink/flink-docs-master/release-notes/flink-1.6.html

Recently 1.5 was released:
https://flink.apache.org/news/2018/05/25/release-1.5.0.html



On Mon, Jun 11, 2018 at 10:24 AM Puneet Kinra <
puneet.ki...@customercentria.com> wrote:

> Hi
>
> can anybody please send the link or ref document for 1.6 release.
>
> --
> *Cheers *
>
> *Puneet Kinra*
>
> *Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
> *
>
> *e-mail :puneet.ki...@customercentria.com
> *
>
>
>

-- 
Thanks,
Vipul


Flink 1.6 release note!!

2018-06-11 Thread Puneet Kinra
Hi

can anybody please send the link or ref document for 1.6 release.

-- 
*Cheers *

*Puneet Kinra*

*Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
*

*e-mail :puneet.ki...@customercentria.com
*


Ask about aggeragtion on joined streams

2018-06-11 Thread Rad Rad


Hi, 

Could you help me if I want to do aggregations of two joined streams such as
AVG 


FirstStream.join(SecondStream)
.where(new FirstKeySelector())
.equalTo(new SecondKeySelector())

.window(TumblingEventTimeWindows.of(Time.milliseconds(1)))
.apply(joinStreamFunc).print();
// here I need to add avg( filed 1)


Regards. 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: how to emit record to downstream operator in snapshotState and/or onProcessingTime

2018-06-11 Thread Steven Wu
Pirotr,

> However you could do it via a custom Operator (there you have a constant
access to output collector).

Can you elaborate that a little bit? are you referring to
"Output> output" in AbstractStreamOperator class?

> register processing time service in your ProcessFunction.

I think your timer proposal can work.

I was originally register timer like this. ProcessingTimeCallback interface
doesn't supply the Collector parameter

((StreamingRuntimeContext) getRuntimeContext())
.getProcessingTimeService()
.registerTimer(..., this);

Thanks,
Steven



On Mon, Jun 11, 2018 at 2:52 AM, Piotr Nowojski 
wrote:

> Hi,
>
> Indeed it seems like this is not possible to emit records on
> checkpoint/snapshot through ProcessFunction. However you could do it via a
> custom Operator (there you have a constant access to output collector).
> Another workaround might be to register processing time service in your
> ProcessFunction.
>
> @Override
> public void processElement(Integer value, Context ctx, Collector
> out) throws Exception {
>ctx.timerService().registerProcessingTimeTimer(...);
> }
>
> @Override
> public void onTimer(long timestamp, OnTimerContext ctx, Collector<
> Integer> out) throws Exception {
>// …
> }
>
> Piotrek
>
> On 11 Jun 2018, at 01:07, Steven Wu  wrote:
>
> I have a process function defined with these interfaces
>
> public class MyProcessFunction extends ProcessFunction
> implements CheckpointedFunction, ProcessingTimeCallback {...}
>
> In snapshotState() method, I want to close files and emit the metadata
> about the closed files to downstream operator. it doesn't seem possible
> with *snapshotState(FunctionSnapshotContext context*) interface.
>
> I can keep metadata in snapshot and restore them during recovery. but if
> there is no input record coming for a long time, * processElement(T
> value, Context ctx, Collector out)* won't be called. Then I
> can't forward the restored data to downstream operator with guaranteed
> latency.
>
> I can add a timer. but it doesn't seem that *onProcessingTime(long
> timestamp)* allows me to forward output to downstream operator either.
>
> Thanks,
> Steven
>
>
>


Re: Flink kafka consumer stopped committing offsets

2018-06-11 Thread amit pal
Probably your kafka consumer is rebalancing.  This can be due to a bigger
message processing time due to which kafka broker is marking your consumer
dead and rebalancing. This all happens before the consumer can commit the
offsets.

On Mon, Jun 11, 2018 at 7:37 PM Piotr Nowojski 
wrote:

> The more I look into it, the more it seems like a Kafka bug or some
> cluster failure from which your Kafka cluster did not recover.
>
> In your cases auto committing should be set to true and in that case
> KafkaConsumer should commit offsets once every so often when it’s polling
> messages. Unless for example `cordinatorUnknown()` returns false in
> `org.apache.kafka.clients.consumer.internals.ConsumerCoordinator#maybeAutoCommitOffsetsAsync`
> (Kafka 0.10.2.1 code base):
>
> private void maybeAutoCommitOffsetsAsync(long now) {
> if (autoCommitEnabled) {
> if (coordinatorUnknown()) {
> this.nextAutoCommitDeadline = now + retryBackoffMs;
> } else if (now >= nextAutoCommitDeadline) {
> this.nextAutoCommitDeadline = now + autoCommitIntervalMs;
> doAutoCommitOffsetsAsync();
> }
> }
> }
>
> Have you checked Kafka logs? This suggests that the real problem is hidden
> behind:
>
> >  INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator
>  - Marking the coordinator my-kafka-host-10-1-16-
> 97.cloud-internal.mycompany.com:9092 (id: 2147483550 <(214)%20748-3550>
> rack: null) dead for group
> aggregate-all_server_measurements_combined-20180606-1000
>
> And maybe your Kafka cluster/consumer can not recover from this situation.
>
> Another thing to try (simpler) is to just trying upgrading Kafka cluster.
>
> Piotrek
>
> On 11 Jun 2018, at 11:44, Juho Autio  wrote:
>
> Hi Piotr, thanks for your insights.
>
> > What’s your KafkaConsumer configuration?
>
> We only set these in the properties that are passed to
> FlinkKafkaConsumer010 constructor:
>
> auto.offset.reset=latest
> bootstrap.servers=my-kafka-host:9092
> group.id=my_group
> flink.partition-discovery.interval-millis=3
>
> > is checkpointing enabled?
>
> No.
>
> > enable.auto.commit (or auto.commit.enable for Kafka 0.8) /
> auto.commit.interval.ms
>
> We have whatever is the default behaviour of Flink kafka consumer. It
> seems to commit quite often, something like every 5 seconds.
>
> > did you set setCommitOffsetsOnCheckpoints() ?
>
> No. But I checked with debugger that
> apparently enableCommitOnCheckpoints=true is the default.
>
> I also checked with debugger that offsetCommitMode=KAFKA_PERIODIC.
>
> So I guess you're right that this bug doesn't seem to be in Flink itself?
> I wonder if it's a known issue in Kafka client lib..
>
> I also took thread dump on one of the task managers in this broken state.
> But I couldn't spot anything obvious when comparing the threads to a dump
> from a job where offsets are being committed. Any way I've saved the thread
> dump in case there's something to look for specifically.
>
> Sharing the full logs of job & task managers would be a bit of a hassle,
> because I don't have an automatic way to obfuscate the logs so that I'm
> sure that there isn't anything sensitive left. Any way, there isn't
> anything else to share really. I wrote: "As you can see, it didn't log
> anything until ~2018-06-07 22:08. Also that's where the log ends".
>
> Thanks once more.
>
> On Mon, Jun 11, 2018 at 11:18 AM, Piotr Nowojski 
> wrote:
>
>> Hi,
>>
>> What’s your KafkaConsumer configuration? Especially values for:
>> - is checkpointing enabled?
>> - enable.auto.commit (or auto.commit.enable for Kafka 0.8) /
>> auto.commit.interval.ms
>> - did you set setCommitOffsetsOnCheckpoints() ?
>>
>> Please also refer to
>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration
>>  ,
>> especially this part:
>>
>> > Note that the Flink Kafka Consumer does not rely on the committed
>> offsets for fault tolerance guarantees. The committed offsets are only a
>> means to expose the consumer’s progress for monitoring purposes.
>>
>> Can you post full logs from all TaskManagers/JobManager and can you
>> say/estimate when did the committing brake/stop? Did you check Kafka logs
>> for any errors?
>>
>> To me it seems more like a Kafka issue/bug:
>>
>> https://community.cloudera.com/t5/Data-Ingestion-Integration/Cannot-auto-commit-offsets-for-group-console-consumer-79720/m-p/51188
>>
>> https://stackoverflow.com/questions/42362911/kafka-high-level-consumer-error-code-15/42416232#42416232
>> Especially that in your case this offsets committing is superseded by
>> Kafka coordinator failure.
>>
>> Piotrek
>>
>>
>> On 8 Jun 2018, at 10:05, Juho Autio  wrote:
>>
>> Hi,
>>
>> We have a Flink stream job that uses Flink kafka consumer. Normally it
>> commits consumer offsets to Kafka.
>>
>> However this stream ended up in a state where it's otherwise working just
>> fine, but it isn't committing offsets to 

Re: Flink kafka consumer stopped committing offsets

2018-06-11 Thread Piotr Nowojski
The more I look into it, the more it seems like a Kafka bug or some cluster 
failure from which your Kafka cluster did not recover.

In your cases auto committing should be set to true and in that case 
KafkaConsumer should commit offsets once every so often when it’s polling 
messages. Unless for example `cordinatorUnknown()` returns false in 
`org.apache.kafka.clients.consumer.internals.ConsumerCoordinator#maybeAutoCommitOffsetsAsync`
 (Kafka 0.10.2.1 code base):

private void maybeAutoCommitOffsetsAsync(long now) {
if (autoCommitEnabled) {
if (coordinatorUnknown()) {
this.nextAutoCommitDeadline = now + retryBackoffMs;
} else if (now >= nextAutoCommitDeadline) {
this.nextAutoCommitDeadline = now + autoCommitIntervalMs;
doAutoCommitOffsetsAsync();
}
}
}

Have you checked Kafka logs? This suggests that the real problem is hidden 
behind:

>  INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - 
> Marking the coordinator 
> my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092 (id: 2147483550 
> rack: null) dead for group 
> aggregate-all_server_measurements_combined-20180606-1000

And maybe your Kafka cluster/consumer can not recover from this situation.

Another thing to try (simpler) is to just trying upgrading Kafka cluster.

Piotrek

> On 11 Jun 2018, at 11:44, Juho Autio  wrote:
> 
> Hi Piotr, thanks for your insights.
> 
> > What’s your KafkaConsumer configuration?
> 
> We only set these in the properties that are passed to FlinkKafkaConsumer010 
> constructor:
> 
> auto.offset.reset=latest
> bootstrap.servers=my-kafka-host:9092
> group.id =my_group
> flink.partition-discovery.interval-millis=3
> 
> > is checkpointing enabled?
> 
> No.
> 
> > enable.auto.commit (or auto.commit.enable for Kafka 0.8) / 
> > auto.commit.interval.ms 
> 
> We have whatever is the default behaviour of Flink kafka consumer. It seems 
> to commit quite often, something like every 5 seconds.
> 
> > did you set setCommitOffsetsOnCheckpoints() ?
> 
> No. But I checked with debugger that apparently 
> enableCommitOnCheckpoints=true is the default.
> 
> I also checked with debugger that offsetCommitMode=KAFKA_PERIODIC.
> 
> So I guess you're right that this bug doesn't seem to be in Flink itself? I 
> wonder if it's a known issue in Kafka client lib..
> 
> I also took thread dump on one of the task managers in this broken state. But 
> I couldn't spot anything obvious when comparing the threads to a dump from a 
> job where offsets are being committed. Any way I've saved the thread dump in 
> case there's something to look for specifically.
> 
> Sharing the full logs of job & task managers would be a bit of a hassle, 
> because I don't have an automatic way to obfuscate the logs so that I'm sure 
> that there isn't anything sensitive left. Any way, there isn't anything else 
> to share really. I wrote: "As you can see, it didn't log anything until 
> ~2018-06-07 22:08. Also that's where the log ends".
> 
> Thanks once more.
> 
> On Mon, Jun 11, 2018 at 11:18 AM, Piotr Nowojski  > wrote:
> Hi,
> 
> What’s your KafkaConsumer configuration? Especially values for:
> - is checkpointing enabled?
> - enable.auto.commit (or auto.commit.enable for Kafka 0.8) / 
> auto.commit.interval.ms 
> - did you set setCommitOffsetsOnCheckpoints() ?
> 
> Please also refer to 
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration
>  
> 
>  , especially this part:
> 
> > Note that the Flink Kafka Consumer does not rely on the committed offsets 
> > for fault tolerance guarantees. The committed offsets are only a means to 
> > expose the consumer’s progress for monitoring purposes.
> 
> Can you post full logs from all TaskManagers/JobManager and can you 
> say/estimate when did the committing brake/stop? Did you check Kafka logs for 
> any errors?
> 
> To me it seems more like a Kafka issue/bug:
> https://community.cloudera.com/t5/Data-Ingestion-Integration/Cannot-auto-commit-offsets-for-group-console-consumer-79720/m-p/51188
>  
> 
> https://stackoverflow.com/questions/42362911/kafka-high-level-consumer-error-code-15/42416232#42416232
>  
> 
> Especially that in your case this offsets committing is superseded by Kafka 
> coordinator failure.
> 
> Piotrek
> 
> 
>> On 8 Jun 2018, at 10:05, Juho Autio > > wrote:
>> 
>> Hi,
>> 
>> We have a Flink stream 

Writing csv to Hadoop Data stream

2018-06-11 Thread miki haiat
Hi,
Im trying to stream data to Haddop  as a csv .

In batch processing i can use  HadoopOutputFormat like that (
example/WordCount.java

).

I cant find any way to integrate bucktsink and HaddopOutputFormat and im
not sure if its the correct way ?

Any suggestion how can i stream  csv to Hadoop.


thanks,

Miki


Re: Akka version conflict running on Flink cluster

2018-06-11 Thread Piotr Nowojski
Hi,

Please take a look on this thread first:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Akka-Http-used-in-custom-RichSourceFunction-td20314.html
 


Piotrek

> On 11 Jun 2018, at 11:16, Wouter Zorgdrager  wrote:
> 
> Hi,
> 
> I think I'm running into an Akka version conflict when running a Flink job on 
> a cluster.
> 
> The current situation:
> - Flink cluster on Flink 1.4.2 (using Docker)
> - Flink job which uses twitter4s [1] library and Akka version 2.5.8
> 
> In my Flink job I try to 'shutdown' an Akka actor from the twitter4s library.
> This results in a whole taskmanager crashing with the following stacktrace:
> 
> taskrunner_1  | 2018-06-11 09:03:14,454 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - 
> Un-registering task and sending final execution state CANCELED to JobManager 
> for task Source: Custom Source -> Sink: Unnamed 
> (0ba7f7f259eee06fe2f7d783c868179b)
> taskrunner_1  | Uncaught error from thread 
> [twitter4s-streaming-akka.actor.default-dispatcher-288]: loader constraint 
> violation: when resolving method 
> "akka.actor.ActorCell$$anonfun$3.(Lakka/actor/ActorCell;)V" the class 
> loader (instance of 
> org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader)
>  of the current class, akka/actor/ActorCell, and the class loader (instance 
> of sun/misc/Launcher$AppClassLoader) for the method's defining class, 
> akka/actor/ActorCell$$anonfun$3, have different Class objects for the type 
> akka/actor/ActorCell used in the signature, shutting down JVM since 
> 'akka.jvm-exit-on-fatal-error' is enabled for for 
> ActorSystem[twitter4s-streaming]
> taskrunner_1  | java.lang.LinkageError: loader constraint violation: when 
> resolving method 
> "akka.actor.ActorCell$$anonfun$3.(Lakka/actor/ActorCell;)V" the class 
> loader (instance of 
> org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader)
>  of the current class, akka/actor/ActorCell, and the class loader (instance 
> of sun/misc/Launcher$AppClassLoader) for the method's defining class, 
> akka/actor/ActorCell$$anonfun$3, have different Class objects for the type 
> akka/actor/ActorCell used in the signature
> taskrunner_1  | at akka.actor.ActorCell.invoke(ActorCell.scala:499)
> taskrunner_1  | at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> taskrunner_1  | at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> taskrunner_1  | at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> taskrunner_1  | at 
> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> taskrunner_1  | at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> taskrunner_1  | at 
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> taskrunner_1  | at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> taskrunner_1  | 2018-06-11 09:03:14,984 INFO  
> org.apache.flink.runtime.blob.PermanentBlobCache  - Shutting down 
> BLOB cache
> taskrunner_1  | 2018-06-11 09:03:14,985 INFO  
> org.apache.flink.runtime.blob.TransientBlobCache  - Shutting down 
> BLOB cache
> taskrunner_1  | Exception in thread "twitter4s-streaming-shutdown-hook-1" 
> java.lang.NoClassDefFoundError: 
> akka/actor/CoordinatedShutdown$$anonfun$totalTimeout$1
> taskrunner_1  | at 
> akka.actor.CoordinatedShutdown.totalTimeout(CoordinatedShutdown.scala:515)
> taskrunner_1  | at 
> akka.actor.CoordinatedShutdown$$anonfun$initJvmHook$1.apply(CoordinatedShutdown.scala:217)
> taskrunner_1  | at 
> akka.actor.CoordinatedShutdown$$anon$2.run(CoordinatedShutdown.scala:547)
> taskrunner_1  | Caused by: java.lang.ClassNotFoundException: 
> akka.actor.CoordinatedShutdown$$anonfun$totalTimeout$1
> taskrunner_1  | at 
> java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> taskrunner_1  | at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> taskrunner_1  | at 
> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:128)
> taskrunner_1  | at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> taskrunner_1  | ... 3 more
> 
> To me, it looks like an version conflict. Any suggestions how to solve this?
> 
> Thanks!
> Wouter
> 
> [1] - Twitter4s: 
> https://github.com/DanielaSfregola/twitter4s/blob/master/build.sbt 
> 
> 



Re: Kafka to Flink to Hive - Writes failing

2018-06-11 Thread Piotr Nowojski
Yes, BucketingSink is a better option. You can start from looking at the 
BucketingSink java docs.

Please also take a look on this: 

https://stackoverflow.com/questions/47669729/how-to-write-to-orc-files-using-bucketingsink-in-apache-flink
 


Alternatively if you do not need to push a lot of data, you could write your 
own JDBC sink that bases on the JDBCAppendTableSink and adjusting it so that it 
works with hive’s JDBC client.

Piotrek

> On 11 Jun 2018, at 08:12, sagar loke  wrote:
> 
> Thanks, 
> We are getting data in Avro format from Kafka and are planning to write data 
> in ORC format to Hive tables. 
> 
> 1. Is BucketingSink better option for this use case or something else ?
> 2. Is there a sample code example which we can refer ?
> 
> Thanks in advance,
> 
> On Sun, Jun 10, 2018 at 10:49 PM, Jörn Franke  > wrote:
> Don’t use the JDBC driver to write to Hive. The performance of JDBC in 
> general for large volumes is suboptimal.
> Write it to a file in HDFS in a format supported by HIve and point the table 
> definition in Hive to it.
> 
> On 11. Jun 2018, at 04:47, sagar loke  > wrote:
> 
>> I am trying to Sink data to Hive via Confluent Kafka -> Flink -> Hive using 
>> following code snippet:
>> 
>> But I am getting following error:
>> 
>> final StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> DataStream stream = readFromKafka(env);
>> 
>> 
>> private static final TypeInformation[] FIELD_TYPES = new TypeInformation[]{
>> BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO
>> };
>> 
>>  JDBCAppendTableSink sink = JDBCAppendTableSink.builder()
>> .setDrivername("org.apache.hive.jdbc.HiveDriver")
>> .setDBUrl("jdbc:hive2://hiveconnstring <>")
>> .setUsername("myuser")
>> .setPassword("mypass")
>> .setQuery("INSERT INTO testHiveDriverTable (key,value) VALUES 
>> (?,?)")
>> .setBatchSize(1000)
>> .setParameterTypes(FIELD_TYPES)
>> .build();
>> 
>> DataStream rows = stream.map((MapFunction) st1 
>> -> {
>> Row row = new Row(2); // 
>> row.setField(0, st1.get("SOME_ID")); 
>> row.setField(1, st1.get("SOME_ADDRESS"));
>> return row;
>> });
>> 
>> sink.emitDataStream(rows);
>> env.execute("Flink101");
>> 
>> 
>> Caused by: java.lang.RuntimeException: Execution of JDBC statement failed.
>> at org.apache.flink.api.java.io 
>> .jdbc.JDBCOutputFormat.flush(JDBCOutputFormat.java:219)
>> at org.apache.flink.api.java.io 
>> .jdbc.JDBCSinkFunction.snapshotState(JDBCSinkFunction.java:43)
>> at 
>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
>> at 
>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
>> at 
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
>> at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:356)
>> ... 12 more
>> 
>> Caused by: java.sql.SQLException: Method not supported
>> at org.apache.hive.jdbc.HiveStatement.executeBatch(HiveStatement.java:381)
>> at org.apache.flink.api.java.io 
>> .jdbc.JDBCOutputFormat.flush(JDBCOutputFormat.java:216)
>> ... 17 more
>> I checked hive-jdbc driver and it seems that the Method is not supported in 
>> hive-jdbc driver.
>> 
>> public class HiveStatement implements java.sql.Statement {
>> ...
>> 
>>   @Override  
>>   public int[] executeBatch() throws SQLException {
>> throw new SQLFeatureNotSupportedException("Method not supported");
>>   }
>> 
>> ..
>> }
>> Is there any way we can achieve this using JDBC Driver ?
>> 
>> Let me know,
>> 
>> Thanks in advance.
>> 
> 
> 
> 
> -- 
> Regards,
> SAGAR.



[ANNOUNCE] Weekly community update #24

2018-06-11 Thread Till Rohrmann
Dear community,

this is the weekly community update thread #24. Please post any news and
updates you want to share with the community to this thread.

# flink-shaded 4.0 released

flink-shaded 4.0 has been released [1] which bumps Flink's shaded Netty
dependency to 4.1.24.

# Rework of Flink website

The community has started discussing a rework of the Flink website [2]. The
goal is to improve the website's structure and to provide more valuable
information about the project and the community.

# Flink 1.6 features

The community has started a discussion about the next upcoming features in
Flink 1.6 [3]. The goal is to gather ideas and agree on a direction for the
next release. If you have any ideas or want to learn where you could help,
then please join the ML thread.

[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Release-flink-shaded-4-0-release-candidate-2-td22569.html
[2]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/WEBSITE-Proposal-to-rework-the-Flink-website-tp22659.html
[3]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-1-6-features-tp22632.html

Cheers,
Till


Re: Flink kafka consumer stopped committing offsets

2018-06-11 Thread Juho Autio
Hi Piotr, thanks for your insights.

> What’s your KafkaConsumer configuration?

We only set these in the properties that are passed to
FlinkKafkaConsumer010 constructor:

auto.offset.reset=latest
bootstrap.servers=my-kafka-host:9092
group.id=my_group
flink.partition-discovery.interval-millis=3

> is checkpointing enabled?

No.

> enable.auto.commit (or auto.commit.enable for Kafka 0.8) /
auto.commit.interval.ms

We have whatever is the default behaviour of Flink kafka consumer. It seems
to commit quite often, something like every 5 seconds.

> did you set setCommitOffsetsOnCheckpoints() ?

No. But I checked with debugger that apparently enableCommitOnCheckpoints=true
is the default.

I also checked with debugger that offsetCommitMode=KAFKA_PERIODIC.

So I guess you're right that this bug doesn't seem to be in Flink itself? I
wonder if it's a known issue in Kafka client lib..

I also took thread dump on one of the task managers in this broken state.
But I couldn't spot anything obvious when comparing the threads to a dump
from a job where offsets are being committed. Any way I've saved the thread
dump in case there's something to look for specifically.

Sharing the full logs of job & task managers would be a bit of a hassle,
because I don't have an automatic way to obfuscate the logs so that I'm
sure that there isn't anything sensitive left. Any way, there isn't
anything else to share really. I wrote: "As you can see, it didn't log
anything until ~2018-06-07 22:08. Also that's where the log ends".

Thanks once more.

On Mon, Jun 11, 2018 at 11:18 AM, Piotr Nowojski 
wrote:

> Hi,
>
> What’s your KafkaConsumer configuration? Especially values for:
> - is checkpointing enabled?
> - enable.auto.commit (or auto.commit.enable for Kafka 0.8) /
> auto.commit.interval.ms
> - did you set setCommitOffsetsOnCheckpoints() ?
>
> Please also refer to https://ci.apache.org/proje
> cts/flink/flink-docs-release-1.4/dev/connectors/kafka.html#k
> afka-consumers-offset-committing-behaviour-configuration , especially
> this part:
>
> > Note that the Flink Kafka Consumer does not rely on the committed
> offsets for fault tolerance guarantees. The committed offsets are only a
> means to expose the consumer’s progress for monitoring purposes.
>
> Can you post full logs from all TaskManagers/JobManager and can you
> say/estimate when did the committing brake/stop? Did you check Kafka logs
> for any errors?
>
> To me it seems more like a Kafka issue/bug:
> https://community.cloudera.com/t5/Data-Ingestion-Integration
> /Cannot-auto-commit-offsets-for-group-console-consumer-79720/m-p/51188
> https://stackoverflow.com/questions/42362911/kafka-high-leve
> l-consumer-error-code-15/42416232#42416232
> Especially that in your case this offsets committing is superseded by
> Kafka coordinator failure.
>
> Piotrek
>
>
> On 8 Jun 2018, at 10:05, Juho Autio  wrote:
>
> Hi,
>
> We have a Flink stream job that uses Flink kafka consumer. Normally it
> commits consumer offsets to Kafka.
>
> However this stream ended up in a state where it's otherwise working just
> fine, but it isn't committing offsets to Kafka any more. The job keeps
> writing correct aggregation results to the sink, though. At the time of
> writing this, the job has been running 14 hours without committing offsets.
>
> Below is an extract from taskmanager.log. As you can see, it didn't log
> anything until ~2018-06-07 22:08. Also that's where the log ends, these are
> the last lines so far.
>
> Could you help check if this is a know bug, possibly already fixed, or
> something new?
>
> I'm using a self-built Flink package 1.5-SNAPSHOT, flink commit
> 8395508b0401353ed07375e22882e7581d46ac0e which is not super old.
>
> Cheers,
> Juho
>
> 2018-06-06 10:01:33,498 INFO  org.apache.kafka.common.utils.AppInfoParser
>  - Kafka version : 0.10.2.1
> 2018-06-06 10:01:33,498 INFO  org.apache.kafka.common.utils.AppInfoParser
>  - Kafka commitId : e89bffd6b2eff799
> 2018-06-06 10:01:33,560 INFO  org.apache.kafka.clients.consu
> mer.internals.AbstractCoordinator  - Discovered coordinator
> my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092 (id:
> 2147483550 rack: null) for group aggregate-all_server_measureme
> nts_combined-20180606-1000.
> 2018-06-06 10:01:33,563 INFO  org.apache.kafka.clients.consu
> mer.internals.AbstractCoordinator  - Discovered coordinator
> my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092 (id:
> 2147483550 rack: null) for group aggregate-all_server_measureme
> nts_combined-20180606-1000.
> 2018-06-07 22:08:28,773 INFO  org.apache.kafka.clients.consu
> mer.internals.AbstractCoordinator  - Marking the coordinator
> my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092 (id:
> 2147483550 rack: null) dead for group aggregate-all_server_measureme
> nts_combined-20180606-1000
> 2018-06-07 22:08:28,776 WARN  org.apache.kafka.clients.consu
> mer.internals.ConsumerCoordinator  - Auto-commit of offsets
> 

Re: Stopping of a streaming job empties state store on HDFS

2018-06-11 Thread Stefan Richter
Hi,

> Am 08.06.2018 um 01:16 schrieb Peter Zende :
> 
> Hi all,
> 
> We have a streaming pipeline (Flink 1.4.2) for which we implemented stoppable 
> sources to be able to  gracefully exit from the job with Yarn state 
> "finished/succeeded".
> This works fine, however after creating a savepoint, stopping the job (stop 
> event) and restarting it we remarked that the RocksDB state hasn't been 
> recovered. It looks like that it's because the state directory on HDFS was 
> emptied after issueing a stop event. This isn't the case when we cancel the 
> job, but we'd like to distinguish between job failures and stop events. After 
> reading some related tickets (e.g. FLINK-4201, FLINK-5007) it's still not 
> clear why this is the intended behavior.
> Should we use cancel instead?

Savepoints should _not_ be cleaned up in case of stop or cancellation, 
checkpoints should be cleaned up. Where are you storing the created savepoints? 
They should not go into the checkpoint directory. Stop is intended to be a more 
„graceful“ variant of cancel, but I think it is rarely used with Flink. I would 
prefer cancel except if you really require to use stoppable for some particular 
reason.

> When we backup the local state directory, stop the job, copy back the 
> directory and start a new job from the savepoint then it works fine.
> Another issue is that when we restart the job with different source (1st job: 
> HDFS and Kafka, 2nd job: Kafka), each having uids set, the recovery from 
> savepoint doesn't fail but the local state isn't restored. Is there any trick 
> besides setting allowNonRestoredState?


I need to clarify here, when you say „each having uids set“, do you set the 
same uids for both types of sources? The uid must match, because Flink will 
reassign the state in a restore based on the uids, i.e. state x goes to the 
operator with the same uid as the uid of the operator that created it in the 
previous job. The flag allowNonRestoredState has the purpose to tolerate that 
some state from a checkpoint/savepoint does not find a matching operator to 
which it should be assigned (no operator with matching uid exists in the 
jobgraph). For example, you want this if you removed operators from the job.

Best,
Stefan



Akka version conflict running on Flink cluster

2018-06-11 Thread Wouter Zorgdrager
Hi,

I think I'm running into an Akka version conflict when running a Flink job
on a cluster.

The current situation:
- Flink cluster on Flink 1.4.2 (using Docker)
- Flink job which uses twitter4s [1] library and Akka version 2.5.8

In my Flink job I try to 'shutdown' an Akka actor from the twitter4s
library.
This results in a whole taskmanager crashing with the following stacktrace:

taskrunner_1  | 2018-06-11 09:03:14,454 INFO
org.apache.flink.runtime.taskmanager.TaskManager  -
Un-registering task and sending final execution state CANCELED to
JobManager for task Source: Custom Source -> Sink: Unnamed
(0ba7f7f259eee06fe2f7d783c868179b)
taskrunner_1  | Uncaught error from thread
[twitter4s-streaming-akka.actor.default-dispatcher-288]: loader constraint
violation: when resolving method
"akka.actor.ActorCell$$anonfun$3.(Lakka/actor/ActorCell;)V" the class
loader (instance of
org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader)
of the current class, akka/actor/ActorCell, and the class loader (instance
of sun/misc/Launcher$AppClassLoader) for the method's defining class,
akka/actor/ActorCell$$anonfun$3, have different Class objects for the type
akka/actor/ActorCell used in the signature, shutting down JVM since
'akka.jvm-exit-on-fatal-error' is enabled for for
ActorSystem[twitter4s-streaming]
taskrunner_1  | java.lang.LinkageError: loader constraint violation: when
resolving method
"akka.actor.ActorCell$$anonfun$3.(Lakka/actor/ActorCell;)V" the class
loader (instance of
org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader)
of the current class, akka/actor/ActorCell, and the class loader (instance
of sun/misc/Launcher$AppClassLoader) for the method's defining class,
akka/actor/ActorCell$$anonfun$3, have different Class objects for the type
akka/actor/ActorCell used in the signature
taskrunner_1  | at akka.actor.ActorCell.invoke(ActorCell.scala:499)
taskrunner_1  | at
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
taskrunner_1  | at akka.dispatch.Mailbox.run(Mailbox.scala:224)
taskrunner_1  | at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
taskrunner_1  | at
akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
taskrunner_1  | at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
taskrunner_1  | at
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
taskrunner_1  | at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
taskrunner_1  | 2018-06-11 09:03:14,984 INFO
org.apache.flink.runtime.blob.PermanentBlobCache  - Shutting
down BLOB cache
taskrunner_1  | 2018-06-11 09:03:14,985 INFO
org.apache.flink.runtime.blob.TransientBlobCache  - Shutting
down BLOB cache
taskrunner_1  | Exception in thread "twitter4s-streaming-shutdown-hook-1"
java.lang.NoClassDefFoundError:
akka/actor/CoordinatedShutdown$$anonfun$totalTimeout$1
taskrunner_1  | at
akka.actor.CoordinatedShutdown.totalTimeout(CoordinatedShutdown.scala:515)
taskrunner_1  | at
akka.actor.CoordinatedShutdown$$anonfun$initJvmHook$1.apply(CoordinatedShutdown.scala:217)
taskrunner_1  | at
akka.actor.CoordinatedShutdown$$anon$2.run(CoordinatedShutdown.scala:547)
taskrunner_1  | Caused by: java.lang.ClassNotFoundException:
akka.actor.CoordinatedShutdown$$anonfun$totalTimeout$1
taskrunner_1  | at
java.net.URLClassLoader.findClass(URLClassLoader.java:381)
taskrunner_1  | at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
taskrunner_1  | at
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:128)
taskrunner_1  | at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
taskrunner_1  | ... 3 more

To me, it looks like an version conflict. Any suggestions how to solve this?

Thanks!
Wouter

[1] - Twitter4s:
https://github.com/DanielaSfregola/twitter4s/blob/master/build.sbt


Re: Take elements from window

2018-06-11 Thread Piotr Nowojski
Hi,

Do I understand you correctly, that you just want to have a three different 
sliding windows (for 3 rules) with duration of 10, 20 and 30 minutes? If so, I 
haven’t tested it but I would guess that there are at least two solutions for 
the problem:

1. just create three different sliding windows on top of the same stream 
(possibly later join/union stream)
2. create sliding window (60 minutes, 10 minutes) and provide custom 
ReduceFunction/ProcessFunction that splits the records internally into separate 
aggregations windows. Your reduce function would have a 6 aggregates for 10 
minutes intervals for rule1, 3 aggregates for 20 minute intervals and 2 
aggregates for 30 minute intervals.

Piotrek

> On 8 Jun 2018, at 21:10, Antonio Saldivar Lezama  wrote:
> 
> Hello
> 
> 
> I am wondering if it is possible to process the following scenario, to store 
> all events by event time in a general window and process elements from a 
> smaller time Frame
> 
> 1.-  Store elements in a General SlidingWindow (60 mins, 10 mins)
>   - Rule 1 -> gets 10 mins elements from the general window and get 
> aggregations
>   - Rule 2 -> gets 20 mins elements from the general window and get 
> aggregations
>   - Rule 3 -> gets 30 mins elements from the general window and get 
> aggregations
> 2.- send results 
> 
> Thank you
> Regards



Re: Heap Problem with Checkpoints

2018-06-11 Thread Piotr Nowojski
Hi,

What kind of messages are those “logs about S3 operations”? Did you try to 
google search them? Maybe it’s a known S3 issue?

Another approach is please use some heap space analyser from which you can 
backtrack classes that are referencing those “memory leaks” and again try to 
google any known memory issues.

It also could just mean, that it’s not a memory leak, but you just need to 
allocate more heap space for your JVM (and memory consumption will stabilise at 
some point).

Piotrek

> On 8 Jun 2018, at 18:32, Fabian Wollert  wrote:
> 
> Hi, in this email thread 
> 
>  here, i tried to set up S3 as a filesystem backend for checkpoints. Now 
> everything is working (Flink V1.5.0), but the JobMaster is accumulating Heap 
> space, with eventually killing itself with HeapSpace OOM after several hours. 
> If I don't enable Checkpointing, then everything is fine. I'm using the Flink 
> S3 Shaded Libs (tried both the Hadoop and the Presto lib, no difference in 
> this regard) from the tutorial. my checkpoint settings are this (job level):
> 
> env.enableCheckpointing(1000);
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);
> env.getCheckpointConfig().setCheckpointTimeout(6);
> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
> 
> Another clue why i suspect the S3 Checkpointing is that the heapspace dump 
> contains a lot of char[] objects with some logs about S3 operations.
> 
> anyone has an idea where to look further on this?
> 
> Cheers
> 
> --
> 
> Fabian Wollert
> Zalando SE
> 
> E-Mail: fabian.woll...@zalando.de
>  
> 
> Tamara-Danz-Straße 1
> 10243 Berlin
> Fax: +49 (0)30 2759 46 93
> E-mail: legalnot...@zalando.co.uk 
> Notifications of major holdings (Sec. 33, 38, 39 WpHG):  +49 (0)30 2000889349
> 
> Management Board:
> Robert Gentz, David Schneider, Rubin Ritter
> 
> Chairman of the Supervisory Board:
> Lothar Lanz
> 
> Person responsible for providing the contents of Zalando SE acc. to Art. 55 
> RStV [Interstate Broadcasting Agreement]: Rubin Ritter
> Registered at the Local Court Charlottenburg Berlin, HRB 158855 B
> VAT registration number: DE 260543043



Re: File does not exist prevent from Job manager to start .

2018-06-11 Thread Till Rohrmann
Hi Miki,

Flink tries first to store the checkpoint data in Hadoop before writing the
handle to the meta data in ZooKeeper. Thus, if the handle is in ZooKeeper,
then it should also have been written to HDFS. Maybe you could check the
HDFS logs whether you find something suspicious.

If ZooKeeper fails while writing the meta data state handle, then the
checkpoint should be automatically discarded. But you might want to
investigate why the ZooKeeper authentication failed. Flink needs a working
ZooKeeper quorum to run in HA mode.

Maybe you could try to reproduce a failing run and share the log files with
us. They might be helpful to further investigate the problem.

Cheers,
Till

On Wed, Jun 6, 2018 at 1:06 PM miki haiat  wrote:

> I had some   zookeeper errors that  crashed the cluster
>
>  ERROR org.apache.flink.shaded.org.apache.curator.ConnectionState
>   - Authentication failed
>
> What happen to Flink checkpoint and state if zookeeper cluster is crashed
> ?
> Is it possible that the checkpoint/state is written in zookeeper   but
> not in Hadoop and then when i try to restart the flink cluster im getting
> the file not find error ??
>
>
> On Mon, Jun 4, 2018 at 4:27 PM Till Rohrmann  wrote:
>
>> Hi Miki,
>>
>> it looks as if you did not submit a job to the cluster of which you
>> shared the logs. At least I could not see a submit job call.
>>
>> Cheers,
>> Till
>>
>> On Mon, Jun 4, 2018 at 12:31 PM miki haiat  wrote:
>>
>>> HI Till,
>>> Iv`e managed to do  reproduce it.
>>> Full log faild_jm.log
>>> 
>>>
>>>
>>>
>>>
>>> On Mon, Jun 4, 2018 at 10:33 AM Till Rohrmann 
>>> wrote:
>>>
 Hmmm, Flink should not delete the stored blobs on the HA storage. Could
 you try to reproduce the problem and then send us the logs on DEBUG level?
 Please also check before shutting the cluster down, that the files were
 there.

 Cheers,
 Till

 On Sun, Jun 3, 2018 at 1:10 PM miki haiat  wrote:

> Hi  Till ,
>
>1. the files are not longer exist in HDFS.
>2. yes , stop and start the cluster from the bin commands.
>3.  unfortunately i deleted the log.. :(
>
>
> I wondered if this code could cause this issue , the way in using
> checkpoint
>
> StateBackend sb = new 
> FsStateBackend("hdfs://***/flink/my_city/checkpoints");
> env.setStateBackend(sb);
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
> env.getCheckpointConfig().setCheckpointInterval(6);
>
>
>
>
>
>
>
>
>
>
> On Fri, Jun 1, 2018 at 6:19 PM Till Rohrmann 
> wrote:
>
>> Hi Miki,
>>
>> could you check whether the files are really no longer stored on
>> HDFS? How did you terminate the cluster? Simply calling
>> `bin/stop-cluster.sh`? I just tried it locally and it could recover the 
>> job
>> after calling `bin/start-cluster.sh` again.
>>
>> What would be helpful are the logs from the initial run of the job.
>> So if you can reproduce the problem, then this log would be very helpful.
>>
>> Cheers,
>> Till
>>
>> On Thu, May 31, 2018 at 6:14 PM, miki haiat 
>> wrote:
>>
>>> Hi,
>>>
>>> Im having some wierd issue with the JM recovery ,
>>> I using HDFS and ZOOKEEPER for HA stand alone cluster .
>>>
>>> Iv  stop the cluster change some parameters in the flink conf
>>> (Memory).
>>> But now when i start the cluster again im having an error that
>>> preventing from JM to start.
>>> somehow the checkpoint file doesn't exists in HDOOP  and JM wont
>>> start .
>>>
>>> full log JM log file
>>> 
>>>
>>>
 2018-05-31 11:57:05,568 ERROR
 org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Fatal error
 occurred in the cluster entrypoint.
>>>
>>> Caused by: java.lang.Exception: Cannot set up the user code
>>> libraries: File does not exist:
>>> /flink1.5/ha/default/blob/job_5c545fc3f43d69325fb9966b8dd4c8f3/blob_p-5d9f3be555d3b05f90b5e148235d25730eb65b3d-ae486e221962f7b96e36da18fe1c57ca
>>> at
>>> org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:72)
>>>
>>>
>>>
>>>
>>


Re: why BlobServer use ServerSocket instead of Netty's ServerBootstrap?

2018-06-11 Thread Timo Walther

Hi,

I think this question should rather be send to the dev@ mailing list. 
But I will loop in Nico that might know more about the implementation 
details.


Regards,
Timo

Am 11.06.18 um 05:07 schrieb makeyang:

after checking code, I found that BlobServer use ServerSocket instead of
Netty's ServerBootstrap.
I wonder why and is it ok to migtate to ServerBootstrap



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: [BucketingSink] notify on moving into pending/ final state

2018-06-11 Thread Piotr Nowojski
Hi,

I see that could be a useful feature. What exactly now is preventing you from 
inheriting from BucketingSink? Maybe it would be just enough to make the 
BucketingSink easier extendable.

One thing now that could collide with such feature is that Kostas is now 
working on larger BucketingSink rework/refactor. 

Piotrek

> On 8 Jun 2018, at 16:38, Rinat  wrote:
> 
> Hi mates, I got a proposal about functionality of BucketingSink.
> 
> During implementation of one of our tasks we got the following need - create 
> a meta-file, with the path and additional information about the file, created 
> by BucketingSink, when it’s been moved into final place.
> Unfortunately such behaviour is currently not available for us. 
> 
> We’ve implemented our own Sink, that provides an opportunity to register 
> notifiers, that will be called, when file state is changing, but current API 
> doesn’t allow us to add such behaviour using inheritance ...
> 
> It seems, that such functionality could be useful, and could be a part of 
> BucketingSink API
> What do you sink, should I make a PR ?
> 
> Sincerely yours,
> Rinat Sharipov
> Software Engineer at 1DMP CORE Team
> 
> email: r.shari...@cleverdata.ru 
> mobile: +7 (925) 416-37-26
> 
> CleverDATA
> make your data clever
> 



Re: Flink kafka consumer stopped committing offsets

2018-06-11 Thread Piotr Nowojski
Hi,

What’s your KafkaConsumer configuration? Especially values for:
- is checkpointing enabled?
- enable.auto.commit (or auto.commit.enable for Kafka 0.8) / 
auto.commit.interval.ms
- did you set setCommitOffsetsOnCheckpoints() ?

Please also refer to 
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration
 

 , especially this part:

> Note that the Flink Kafka Consumer does not rely on the committed offsets for 
> fault tolerance guarantees. The committed offsets are only a means to expose 
> the consumer’s progress for monitoring purposes.

Can you post full logs from all TaskManagers/JobManager and can you 
say/estimate when did the committing brake/stop? Did you check Kafka logs for 
any errors?

To me it seems more like a Kafka issue/bug:
https://community.cloudera.com/t5/Data-Ingestion-Integration/Cannot-auto-commit-offsets-for-group-console-consumer-79720/m-p/51188
 

https://stackoverflow.com/questions/42362911/kafka-high-level-consumer-error-code-15/42416232#42416232
 

Especially that in your case this offsets committing is superseded by Kafka 
coordinator failure.

Piotrek

> On 8 Jun 2018, at 10:05, Juho Autio  wrote:
> 
> Hi,
> 
> We have a Flink stream job that uses Flink kafka consumer. Normally it 
> commits consumer offsets to Kafka.
> 
> However this stream ended up in a state where it's otherwise working just 
> fine, but it isn't committing offsets to Kafka any more. The job keeps 
> writing correct aggregation results to the sink, though. At the time of 
> writing this, the job has been running 14 hours without committing offsets.
> 
> Below is an extract from taskmanager.log. As you can see, it didn't log 
> anything until ~2018-06-07 22:08. Also that's where the log ends, these are 
> the last lines so far.
> 
> Could you help check if this is a know bug, possibly already fixed, or 
> something new?
> 
> I'm using a self-built Flink package 1.5-SNAPSHOT, flink commit 
> 8395508b0401353ed07375e22882e7581d46ac0e which is not super old.
> 
> Cheers,
> Juho
> 
> 2018-06-06 10:01:33,498 INFO  org.apache.kafka.common.utils.AppInfoParser 
>   - Kafka version : 0.10.2.1
> 2018-06-06 10:01:33,498 INFO  org.apache.kafka.common.utils.AppInfoParser 
>   - Kafka commitId : e89bffd6b2eff799
> 2018-06-06 10:01:33,560 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Discovered 
> coordinator my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092 
>  (id: 
> 2147483550 rack: null) for group 
> aggregate-all_server_measurements_combined-20180606-1000.
> 2018-06-06 10:01:33,563 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Discovered 
> coordinator my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092 
>  (id: 
> 2147483550 rack: null) for group 
> aggregate-all_server_measurements_combined-20180606-1000.
> 2018-06-07 22:08:28,773 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking 
> the coordinator my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092 
>  (id: 
> 2147483550 rack: null) dead for group 
> aggregate-all_server_measurements_combined-20180606-1000
> 2018-06-07 22:08:28,776 WARN  
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - 
> Auto-commit of offsets {topic1-2=OffsetAndMetadata{offset=12300395550, 
> metadata=''}, topic1-18=OffsetAndMetadata{offset=12299210444, metadata=''}, 
> topic3-0=OffsetAndMetadata{offset=5064277287, metadata=''}, 
> topic4-6=OffsetAndMetadata{offset=5492398559, metadata=''}, 
> topic2-1=OffsetAndMetadata{offset=89817267, metadata=''}, 
> topic1-10=OffsetAndMetadata{offset=12299742352, metadata=''}} failed for 
> group aggregate-all_server_measurements_combined-20180606-1000: Offset commit 
> failed with a retriable exception. You should retry committing offsets.
> 2018-06-07 22:08:29,840 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking 
> the coordinator my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092 
>  (id: 
> 2147483550 rack: null) dead for group 
> aggregate-all_server_measurements_combined-20180606-1000
> 2018-06-07 22:08:29,841 WARN  
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - 
> Auto-commit of offsets 

Re: Having a backoff while experiencing checkpointing failures

2018-06-11 Thread Stefan Richter
Hi,

I think the behaviour of min_pause_between_checkpoints is either buggy or we 
should at least discuss if it would not be better to respect a pause also for 
failed checkpoints. As far as I know there is no ongoing work to add backoff, 
so I suggest you open a jira issue and make a case for this.

Best,
Stefan

> Am 08.06.2018 um 06:30 schrieb vipul singh :
> 
> Hello all,
> 
> Are there any recommendations on using a backoff when experiencing 
> checkpointing failures?
> What we have seen is when a checkpoint starts to expire, the next checkpoint 
> dosent care about the previous failure, and starts soon after. We 
> experimented with min_pause_between_checkpoints, however that seems only to 
> work for successful checkpoints( the same is discussed on this thread 
> )
> 
> Are there any recommendations on how to have a backoff or is there something 
> in works to add a backoff incase of checkpointing failures? This seems very 
> valuable incase of checkpointing on an external location like s3, where one 
> can be potentially throttled or gets errors like TooBusyException from s3(for 
> example like in this jira )
> 
> Please let us know!
> Thanks,
> Vipul



Re: Kafka to Flink to Hive - Writes failing

2018-06-11 Thread sagar loke
Thanks,
We are getting data in Avro format from Kafka and are planning to write
data in ORC format to Hive tables.

1. Is BucketingSink better option for this use case or something else ?
2. Is there a sample code example which we can refer ?

Thanks in advance,

On Sun, Jun 10, 2018 at 10:49 PM, Jörn Franke  wrote:

> Don’t use the JDBC driver to write to Hive. The performance of JDBC in
> general for large volumes is suboptimal.
> Write it to a file in HDFS in a format supported by HIve and point the
> table definition in Hive to it.
>
> On 11. Jun 2018, at 04:47, sagar loke  wrote:
>
> I am trying to Sink data to Hive via Confluent Kafka -> Flink -> Hive
> using following code snippet:
>
> But I am getting following error:
>
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> DataStream stream = readFromKafka(env);
>
>
> private static final TypeInformation[] FIELD_TYPES = new TypeInformation[]{
> BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO
> };
>
>  JDBCAppendTableSink sink = JDBCAppendTableSink.builder()
> .setDrivername("org.apache.hive.jdbc.HiveDriver")
> .setDBUrl("jdbc:hive2://hiveconnstring")
> .setUsername("myuser")
> .setPassword("mypass")
> .setQuery("INSERT INTO testHiveDriverTable (key,value) VALUES 
> (?,?)")
> .setBatchSize(1000)
> .setParameterTypes(FIELD_TYPES)
> .build();
>
> DataStream rows = stream.map((MapFunction) st1 
> -> {
> Row row = new Row(2); //
> row.setField(0, st1.get("SOME_ID"));
> row.setField(1, st1.get("SOME_ADDRESS"));
> return row;
> });
>
> sink.emitDataStream(rows);
> env.execute("Flink101");
>
>
> Caused by: java.lang.RuntimeException: Execution of JDBC statement failed.
> at 
> org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.flush(JDBCOutputFormat.java:219)
> at 
> org.apache.flink.api.java.io.jdbc.JDBCSinkFunction.snapshotState(JDBCSinkFunction.java:43)
> at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
> at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
> at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:356)
> ... 12 more
>
> Caused by: java.sql.SQLException: Method not supported
> at org.apache.hive.jdbc.HiveStatement.executeBatch(HiveStatement.java:381)
> at 
> org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.flush(JDBCOutputFormat.java:216)
> ... 17 more
>
> I checked hive-jdbc driver and it seems that the Method is not supported
> in hive-jdbc driver.
>
> public class HiveStatement implements java.sql.Statement {
> ...
>
>   @Override
>   public int[] executeBatch() throws SQLException {
> throw new SQLFeatureNotSupportedException("Method not supported");
>   }
>
> ..
> }
>
> Is there any way we can achieve this using JDBC Driver ?
>
> Let me know,
>
> Thanks in advance.
>
>


-- 
Regards,
SAGAR.