unable to add more servers in zookeeper quorum peers in flink 1.2

2017-03-22 Thread kanagaraj . vengidasamy
Hi All,



We are using flink 1.2  .  Unable to add more than one digit in server.x.  when 
I have more than one digit it is not allowing to start the zookeeper.  What I 
need to do if I want to keep more servers?



# ZooKeeper quorum peers

server.0= server1:2888:3888

server.1= server2:2888:3888

server.2= server3:2888:3888

server.3= server4:2888:3888

server.4= server5:2888:3888

server.5= server6:2888:3888

server.6= server7:2888:3888

server.7= server8:2888:3888

server.8= server9:2888:3888

server.9= server10:2888:3888

#server.10= server11:2888:3888

#server.11= server12:2888:3888



Thanks

Kanagaraj


Re: unable to add more servers in zookeeper quorum peers in flink 1.2

2017-03-22 Thread Greg Hogan
Kanagaraj,

None of the server lines are matching since the regex in 
start-zookeeper-quorum.sh does not allow for spaces after the equals sign.
  ^server\.([0-9])+[[:space:]]*\=([^: \#]+)

Greg

> On Mar 22, 2017, at 8:49 AM, kanagaraj.vengidas...@verizon.com wrote:
> 
> Hi All,
>  
> We are using flink 1.2  .  Unable to add more than one digit in server.x.  
> when I have more than one digit it is not allowing to start the zookeeper.  
> What I need to do if I want to keep more servers?
>  
> # ZooKeeper quorum peers
> server.0= server1:2888:3888
> server.1= server2:2888:3888
> server.2= server3:2888:3888
> server.3= server4:2888:3888
> server.4= server5:2888:3888
> server.5= server6:2888:3888
> server.6= server7:2888:3888
> server.7= server8:2888:3888
> server.8= server9:2888:3888
> server.9= server10:2888:3888
> #server.10= server11:2888:3888
> #server.11= server12:2888:3888
>  
> Thanks
> Kanagaraj



RocksDB segfaults

2017-03-22 Thread Florian König
Hi,

I have experienced two crashes of Flink 1.2 by SIGSEGV, caused by something in 
RocksDB. What is the preferred way to report them? All I got at the moment are 
two hs_err_pid12345.log files. They are over 4000 lines long each. Is there 
anything significant that I should extract to help you guys and/or put into a 
JIRA ticket?

The first thing that came to my mind was the stack traces (see below). Anything 
else?

Thanks
Florian



Stack: [0x7fec04341000,0x7fec04442000],  sp=0x7fec0443ff48,  free 
space=1019k
Java frames: (J=compiled Java code, j=interpreted, Vv=VM code)
J 10252  org.rocksdb.RocksDB.get(J[BIJ)[B (0 bytes) @ 0x7fec925887cc 
[0x7fec92588780+0x4c]
J 27241 C2 
org.apache.flink.contrib.streaming.state.RocksDBValueState.value()Ljava/lang/Object;
 (78 bytes) @ 0x7fec94010ca4 [0x7fec940109c0+0x2e4]
j  com.micardo.backend.TransitionProcessor$2.getValue()Ljava/lang/Long;+7
j  com.micardo.backend.TransitionProcessor$2.getValue()Ljava/lang/Object;+1
J 38483 C2 
org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(Ljava/io/DataOutput;Lorg/apache/flink/runtime/metrics/dump/QueryScopeInfo;Ljava/lang/String;Lorg/apache/flink/metrics/Gauge;)V
 (114 bytes) @ 0x7fec918eabf0 [0x7fec918eabc0+0x30]
J 38522 C2 
org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(Ljava/util/Map;Ljava/util/Map;Ljava/util/Map;Ljava/util/Map;)Lorg/apache/flink/runtime/metrics/dump/MetricDumpSerialization$MetricSerializationResult;
 (471 bytes) @ 0x7fec94eb6260 [0x7fec94eb57a0+0xac0]
J 47531 C2 
org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(Ljava/lang/Object;)V
 (453 bytes) @ 0x7fec95ca57a0 [0x7fec95ca4da0+0xa00]
J 5815 C2 
akka.actor.UntypedActor.aroundReceive(Lscala/PartialFunction;Ljava/lang/Object;)V
 (7 bytes) @ 0x7fec91e3ae6c [0x7fec91e3adc0+0xac]
J 5410 C2 akka.actor.ActorCell.invoke(Lakka/dispatch/Envelope;)V (104 bytes) @ 
0x7fec91d5bc44 [0x7fec91d5b9a0+0x2a4]
J 6628 C2 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(Lscala/concurrent/forkjoin/ForkJoinTask;)V
 (60 bytes) @ 0x7fec9212d050 [0x7fec9212ccc0+0x390]
J 40130 C2 scala.concurrent.forkjoin.ForkJoinWorkerThread.run()V (182 bytes) @ 
0x7fec923f8170 [0x7fec923f7fc0+0x1b0]
v  ~StubRoutines::call_stub

--

Stack: [0x7f167a5b7000,0x7f167a6b8000],  sp=0x7f167a6b5f40,  free 
space=1019k
Native frames: (J=compiled Java code, j=interpreted, Vv=VM code, C=native code)
C  [libstdc++.so.6+0xc026b]  std::basic_string, 
std::allocator >::basic_string(std::string const&)+0xb
C  [librocksdbjni8426686507832168508.so+0x2f14ca]  
rocksdb::BackupEngine::Open(rocksdb::Env*, rocksdb::BackupableDBOptions const&, 
rocksdb::BackupEngine**)+0x3a
C  [librocksdbjni8426686507832168508.so+0x180ad5]  
Java_org_rocksdb_BackupEngine_open+0x25
J 50030  org.rocksdb.BackupEngine.open(JJ)J (0 bytes) @ 0x7f16cb79aa36 
[0x7f16cb79a980+0xb6]
J 49809 C1 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.performSemiAsyncSnapshot(JJ)Ljava/util/HashMap;
 (416 bytes) @ 0x7f16cd2733d4 [0x7f16cd2719a0+0x1a34]
J 51766 C2 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.snapshotPartitionedState(JJ)Ljava/util/HashMap;
 (40 bytes) @ 0x7f16cb40a1fc [0x7f16cb40a1a0+0x5c]
J 50547 C2 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotOperatorState(JJ)Lorg/apache/flink/streaming/runtime/tasks/StreamTaskState;
 (206 bytes) @ 0x7f16cb8be89c [0x7f16cb8be7e0+0xbc]
J 52232 C2 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(JJ)Z (650 
bytes) @ 0x7f16cbfbbf60 [0x7f16cbfbb540+0xa20]
J 52419 C2 
org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(Lorg/apache/flink/runtime/io/network/api/CheckpointBarrier;)V
 (25 bytes) @ 0x7f16cbdd2624 [0x7f16cbdd25c0+0x64]
J 41649 C2 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(Lorg/apache/flink/streaming/api/operators/OneInputStreamOperator;Ljava/lang/Object;)Z
 (439 bytes) @ 0x7f16cc8aed5c [0x7f16cc8add40+0x101c]
J 33374% C2 org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run()V 
(42 bytes) @ 0x7f16cbdc21d0 [0x7f16cbdc20c0+0x110]
j  org.apache.flink.streaming.runtime.tasks.StreamTask.invoke()V+301
j  org.apache.flink.runtime.taskmanager.Task.run()V+700
j  java.lang.Thread.run()V+11
v  ~StubRoutines::call_stub
V  [libjvm.so+0x68c616]
V  [libjvm.so+0x68cb21]
V  [libjvm.so+0x68cfc7]
V  [libjvm.so+0x723d80]
V  [libjvm.so+0xa69dcf]
V  [libjvm.so+0xa69efc]
V  [libjvm.so+0x91d9d8]
C  [libpthread.so.0+0x8064]  start_thread+0xc4

Java frames: (J=compiled Java code, j=interpreted, Vv=VM code)
J 50030  org.rocksdb.BackupEngine.open(JJ)J (0 bytes) @ 0x7f16cb79a9c4 
[0x7f16cb79a980+0x44]
J 49809 C1 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.performSemiAsyncSnap

Re: RocksDB segfaults

2017-03-22 Thread Stephan Ewen
Hi!

It looks like you are running the RocksDB state backend 1.1 (is still an
old version packaged into your JAR file?)

This line indicates that: org.apache.flink.contrib.streaming.state.
RocksDBStateBackend.performSemiAsyncSnapshot (the method does not exist in
1.2 any more)

Can you try and run 1.2 and see if that still occurs? In general, I cannot
vouch 100% for RocksDBs JNI API, but in our 1.2 tests so far it was stable.

Stephan



On Wed, Mar 22, 2017 at 3:13 PM, Florian König 
wrote:

> Hi,
>
> I have experienced two crashes of Flink 1.2 by SIGSEGV, caused by
> something in RocksDB. What is the preferred way to report them? All I got
> at the moment are two hs_err_pid12345.log files. They are over 4000 lines
> long each. Is there anything significant that I should extract to help you
> guys and/or put into a JIRA ticket?
>
> The first thing that came to my mind was the stack traces (see below).
> Anything else?
>
> Thanks
> Florian
>
> 
>
> Stack: [0x7fec04341000,0x7fec04442000],  sp=0x7fec0443ff48,
> free space=1019k
> Java frames: (J=compiled Java code, j=interpreted, Vv=VM code)
> J 10252  org.rocksdb.RocksDB.get(J[BIJ)[B (0 bytes) @ 0x7fec925887cc
> [0x7fec92588780+0x4c]
> J 27241 C2 org.apache.flink.contrib.streaming.state.
> RocksDBValueState.value()Ljava/lang/Object; (78 bytes) @
> 0x7fec94010ca4 [0x7fec940109c0+0x2e4]
> j  com.micardo.backend.TransitionProcessor$2.getValue()Ljava/lang/Long;+7
> j  com.micardo.backend.TransitionProcessor$2.
> getValue()Ljava/lang/Object;+1
> J 38483 C2 org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.
> serializeGauge(Ljava/io/DataOutput;Lorg/apache/flink/runtime/metrics/dump/
> QueryScopeInfo;Ljava/lang/String;Lorg/apache/flink/metrics/Gauge;)V (114
> bytes) @ 0x7fec918eabf0 [0x7fec918eabc0+0x30]
> J 38522 C2 org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$
> MetricDumpSerializer.serialize(Ljava/util/Map;
> Ljava/util/Map;Ljava/util/Map;Ljava/util/Map;)Lorg/apache/
> flink/runtime/metrics/dump/MetricDumpSerialization$MetricSerializationResult;
> (471 bytes) @ 0x7fec94eb6260 [0x7fec94eb57a0+0xac0]
> J 47531 C2 org.apache.flink.runtime.metrics.dump.
> MetricQueryService.onReceive(Ljava/lang/Object;)V (453 bytes) @
> 0x7fec95ca57a0 [0x7fec95ca4da0+0xa00]
> J 5815 C2 akka.actor.UntypedActor.aroundReceive(Lscala/
> PartialFunction;Ljava/lang/Object;)V (7 bytes) @ 0x7fec91e3ae6c
> [0x7fec91e3adc0+0xac]
> J 5410 C2 akka.actor.ActorCell.invoke(Lakka/dispatch/Envelope;)V (104
> bytes) @ 0x7fec91d5bc44 [0x7fec91d5b9a0+0x2a4]
> J 6628 C2 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
> runTask(Lscala/concurrent/forkjoin/ForkJoinTask;)V (60 bytes) @
> 0x7fec9212d050 [0x7fec9212ccc0+0x390]
> J 40130 C2 scala.concurrent.forkjoin.ForkJoinWorkerThread.run()V (182
> bytes) @ 0x7fec923f8170 [0x7fec923f7fc0+0x1b0]
> v  ~StubRoutines::call_stub
>
> --
>
> Stack: [0x7f167a5b7000,0x7f167a6b8000],  sp=0x7f167a6b5f40,
> free space=1019k
> Native frames: (J=compiled Java code, j=interpreted, Vv=VM code, C=native
> code)
> C  [libstdc++.so.6+0xc026b]  std::basic_string std::char_traits, std::allocator >::basic_string(std::string
> const&)+0xb
> C  [librocksdbjni8426686507832168508.so+0x2f14ca]
> rocksdb::BackupEngine::Open(rocksdb::Env*, rocksdb::BackupableDBOptions
> const&, rocksdb::BackupEngine**)+0x3a
> C  [librocksdbjni8426686507832168508.so+0x180ad5]
> Java_org_rocksdb_BackupEngine_open+0x25
> J 50030  org.rocksdb.BackupEngine.open(JJ)J (0 bytes) @
> 0x7f16cb79aa36 [0x7f16cb79a980+0xb6]
> J 49809 C1 org.apache.flink.contrib.streaming.state.RocksDBStateBackend.
> performSemiAsyncSnapshot(JJ)Ljava/util/HashMap; (416 bytes) @
> 0x7f16cd2733d4 [0x7f16cd2719a0+0x1a34]
> J 51766 C2 org.apache.flink.contrib.streaming.state.RocksDBStateBackend.
> snapshotPartitionedState(JJ)Ljava/util/HashMap; (40 bytes) @
> 0x7f16cb40a1fc [0x7f16cb40a1a0+0x5c]
> J 50547 C2 org.apache.flink.streaming.api.operators.
> AbstractUdfStreamOperator.snapshotOperatorState(JJ)Lorg/
> apache/flink/streaming/runtime/tasks/StreamTaskState; (206 bytes) @
> 0x7f16cb8be89c [0x7f16cb8be7e0+0xbc]
> J 52232 C2 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(JJ)Z
> (650 bytes) @ 0x7f16cbfbbf60 [0x7f16cbfbb540+0xa20]
> J 52419 C2 org.apache.flink.streaming.runtime.io.BarrierBuffer.
> notifyCheckpoint(Lorg/apache/flink/runtime/io/network/api/CheckpointBarrier;)V
> (25 bytes) @ 0x7f16cbdd2624 [0x7f16cbdd25c0+0x64]
> J 41649 C2 org.apache.flink.streaming.runtime.io.StreamInputProcessor.
> processInput(Lorg/apache/flink/streaming/api/operators/
> OneInputStreamOperator;Ljava/lang/Object;)Z (439 bytes) @
> 0x7f16cc8aed5c [0x7f16cc8add40+0x101c]
> J 33374% C2 org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run()V
> (42 bytes) @ 0x7f16cbdc21d0 [0x7f16cbdc20c0+0x110]

Re: unable to add more servers in zookeeper quorum peers in flink 1.2

2017-03-22 Thread Stephan Ewen
Side note: It's quite uncommon to have more than 9 ZK nodes (at least if
the ZK is dedicated to Flink alone).

The ZooKeeper docs suggest to add "observers" to scale out ZooKeeper
further, not quorum peers, because many quorum peers make ZooKeeper
transactions slower (because the quorum that has to reach consensus is
larger).

On Wed, Mar 22, 2017 at 3:08 PM, Greg Hogan  wrote:

> Kanagaraj,
>
> None of the server lines are matching since the regex in
> start-zookeeper-quorum.sh does not allow for spaces after the equals sign.
>   ^server\.([0-9])+[[:space:]]*\=([^: \#]+)
>
> Greg
>
> On Mar 22, 2017, at 8:49 AM, kanagaraj.vengidas...@verizon.com wrote:
>
> Hi All,
>
> We are using flink 1.2  .  Unable to add more than one digit in server.x.
> when I have more than one digit it is not allowing to start the zookeeper.
> What I need to do if I want to keep more servers?
>
> # ZooKeeper quorum peers
> server.0= server1:2888:3888
> server.1= server2:2888:3888
> server.2= server3:2888:3888
> server.3= server4:2888:3888
> server.4= server5:2888:3888
> server.5= server6:2888:3888
> server.6= server7:2888:3888
> server.7= server8:2888:3888
> server.8= server9:2888:3888
> server.9= server10:2888:3888
> #server.10= server11:2888:3888
> #server.11= server12:2888:3888
>
> Thanks
> Kanagaraj
>
>
>


RE: [E] Re: unable to add more servers in zookeeper quorum peers in flink 1.2

2017-03-22 Thread kanagaraj . vengidasamy
Greg,

Sorry about that.. when I copy the config and replaced the real server names . 
I don't have space in my configurations. The  issue is when I have server.10=, 
server.11=
And tried to start the zookeeper,  Myid replaced with 0 and 1 for these servers 
which is conflicting with server.0 and server.1


Thanks

[Verizon]

Kanagaraj Vengidasamy
RTCI

7701 E Telecom PKWY
Temple Terrace, FL 33637

O 813.978.4372 | M 813.455.9757

  [Twitter]    
[LinkedIn] 

From: Greg Hogan [mailto:c...@greghogan.com]
Sent: Wednesday, March 22, 2017 10:08 AM
To: user@flink.apache.org
Subject: [E] Re: unable to add more servers in zookeeper quorum peers in flink 
1.2

Kanagaraj,

None of the server lines are matching since the regex in 
start-zookeeper-quorum.sh does not allow for spaces after the equals sign.
  ^server\.([0-9])+[[:space:]]*\=([^: \#]+)

Greg

On Mar 22, 2017, at 8:49 AM, 
kanagaraj.vengidas...@verizon.com 
wrote:

Hi All,

We are using flink 1.2  .  Unable to add more than one digit in server.x.  when 
I have more than one digit it is not allowing to start the zookeeper.  What I 
need to do if I want to keep more servers?

# ZooKeeper quorum peers
server.0=server1:2888:3888
server.1=server2:2888:3888
server.2=server3:2888:3888
server.3=server4:2888:3888
server.4=server5:2888:3888
server.5=server6:2888:3888
server.6=server7:2888:3888
server.7=server8:2888:3888
server.8=server9:2888:3888
server.9=server10:2888:3888
#server.10=server11:2888:3888
#server.11=server12:2888:3888

Thanks
Kanagaraj



Re: [E] Re: unable to add more servers in zookeeper quorum peers in flink 1.2

2017-03-22 Thread Chesnay Schepler

I guess that's because the grouping is wrong.

^server\.*([0-9])+*[[:space:]]*\=([^: \#]+)

should probably be

^server\.*([0-9]+)*[[:space:]]*\=([^: \#]+)

Could you modify the .sh script as such and try again?

Regards,
Chesnay

On 22.03.2017 16:10, kanagaraj.vengidas...@verizon.com wrote:


Greg,

Sorry about that.. when I copy the config and replaced the real server 
names . I don’t have space in my configurations. The  issue is when I 
have server.10=, server.11=


And tried to start the zookeeper,  Myid replaced with 0 and 1 for 
these servers which is conflicting with server.0 and server.1


Thanks


Verizon 

Kanagaraj Vengidasamy
RTCI

7701 E Telecom PKWY
Temple Terrace, FL 33637

O 813.978.4372 | M 813.455.9757

Facebook Twitter 
LinkedIn 



*From:*Greg Hogan [mailto:c...@greghogan.com]
*Sent:* Wednesday, March 22, 2017 10:08 AM
*To:* user@flink.apache.org
*Subject:* [E] Re: unable to add more servers in zookeeper quorum 
peers in flink 1.2


Kanagaraj,

None of the server lines are matching since the regex in 
start-zookeeper-quorum.sh does not allow for spaces after the equals sign.


  ^server\.([0-9])+[[:space:]]*\=([^: \#]+)

Greg

On Mar 22, 2017, at 8:49 AM, kanagaraj.vengidas...@verizon.com
 wrote:

Hi All,

We are using flink 1.2  .  Unable to add more than one digit in
server.x.  when I have more than one digit it is not allowing to
start the zookeeper.  What I need to do if I want to keep more
servers?

# ZooKeeper quorum peers

server.0=server1:2888:3888

server.1=server2:2888:3888

server.2=server3:2888:3888

server.3=server4:2888:3888

server.4=server5:2888:3888

server.5=server6:2888:3888

server.6=server7:2888:3888

server.7=server8:2888:3888

server.8=server9:2888:3888

server.9=server10:2888:3888

#server.10=server11:2888:3888

#server.11=server12:2888:3888

Thanks

Kanagaraj





Re: RocksDB segfaults

2017-03-22 Thread Florian König
Hi Stephen,

you are right, the second stack trace is indeed from a run of Flink 1.1.4. 
Sorry, my bad.

That leaves us with the first trace of a segfault for which I can guarantee 
that it brought down a 1.2.0 instance. Unfortunately I cannot reproduce the 
problem. It has happened twice so far, but I can’t see any pattern. Is there 
anything in the stack trace that could point us to a probable cause?

Florian

> Am 22.03.2017 um 16:00 schrieb Stephan Ewen :
> 
> Hi!
> 
> It looks like you are running the RocksDB state backend 1.1 (is still an old 
> version packaged into your JAR file?)
> 
> This line indicates that: 
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.performSemiAsyncSnapshot
>  (the method does not exist in 1.2 any more)
> 
> Can you try and run 1.2 and see if that still occurs? In general, I cannot 
> vouch 100% for RocksDBs JNI API, but in our 1.2 tests so far it was stable.
> 
> Stephan
> 
> 
> 
> On Wed, Mar 22, 2017 at 3:13 PM, Florian König  
> wrote:
> Hi,
> 
> I have experienced two crashes of Flink 1.2 by SIGSEGV, caused by something 
> in RocksDB. What is the preferred way to report them? All I got at the moment 
> are two hs_err_pid12345.log files. They are over 4000 lines long each. Is 
> there anything significant that I should extract to help you guys and/or put 
> into a JIRA ticket?
> 
> The first thing that came to my mind was the stack traces (see below). 
> Anything else?
> 
> Thanks
> Florian
> 
> 
> 
> Stack: [0x7fec04341000,0x7fec04442000],  sp=0x7fec0443ff48,  free 
> space=1019k
> Java frames: (J=compiled Java code, j=interpreted, Vv=VM code)
> J 10252  org.rocksdb.RocksDB.get(J[BIJ)[B (0 bytes) @ 0x7fec925887cc 
> [0x7fec92588780+0x4c]
> J 27241 C2 
> org.apache.flink.contrib.streaming.state.RocksDBValueState.value()Ljava/lang/Object;
>  (78 bytes) @ 0x7fec94010ca4 [0x7fec940109c0+0x2e4]
> j  com.micardo.backend.TransitionProcessor$2.getValue()Ljava/lang/Long;+7
> j  com.micardo.backend.TransitionProcessor$2.getValue()Ljava/lang/Object;+1
> J 38483 C2 
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(Ljava/io/DataOutput;Lorg/apache/flink/runtime/metrics/dump/QueryScopeInfo;Ljava/lang/String;Lorg/apache/flink/metrics/Gauge;)V
>  (114 bytes) @ 0x7fec918eabf0 [0x7fec918eabc0+0x30]
> J 38522 C2 
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(Ljava/util/Map;Ljava/util/Map;Ljava/util/Map;Ljava/util/Map;)Lorg/apache/flink/runtime/metrics/dump/MetricDumpSerialization$MetricSerializationResult;
>  (471 bytes) @ 0x7fec94eb6260 [0x7fec94eb57a0+0xac0]
> J 47531 C2 
> org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(Ljava/lang/Object;)V
>  (453 bytes) @ 0x7fec95ca57a0 [0x7fec95ca4da0+0xa00]
> J 5815 C2 
> akka.actor.UntypedActor.aroundReceive(Lscala/PartialFunction;Ljava/lang/Object;)V
>  (7 bytes) @ 0x7fec91e3ae6c [0x7fec91e3adc0+0xac]
> J 5410 C2 akka.actor.ActorCell.invoke(Lakka/dispatch/Envelope;)V (104 bytes) 
> @ 0x7fec91d5bc44 [0x7fec91d5b9a0+0x2a4]
> J 6628 C2 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(Lscala/concurrent/forkjoin/ForkJoinTask;)V
>  (60 bytes) @ 0x7fec9212d050 [0x7fec9212ccc0+0x390]
> J 40130 C2 scala.concurrent.forkjoin.ForkJoinWorkerThread.run()V (182 bytes) 
> @ 0x7fec923f8170 [0x7fec923f7fc0+0x1b0]
> v  ~StubRoutines::call_stub
> 
> --
> 
> Stack: [0x7f167a5b7000,0x7f167a6b8000],  sp=0x7f167a6b5f40,  free 
> space=1019k
> Native frames: (J=compiled Java code, j=interpreted, Vv=VM code, C=native 
> code)
> C  [libstdc++.so.6+0xc026b]  std::basic_string, 
> std::allocator >::basic_string(std::string const&)+0xb
> C  [librocksdbjni8426686507832168508.so+0x2f14ca]  
> rocksdb::BackupEngine::Open(rocksdb::Env*, rocksdb::BackupableDBOptions 
> const&, rocksdb::BackupEngine**)+0x3a
> C  [librocksdbjni8426686507832168508.so+0x180ad5]  
> Java_org_rocksdb_BackupEngine_open+0x25
> J 50030  org.rocksdb.BackupEngine.open(JJ)J (0 bytes) @ 0x7f16cb79aa36 
> [0x7f16cb79a980+0xb6]
> J 49809 C1 
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.performSemiAsyncSnapshot(JJ)Ljava/util/HashMap;
>  (416 bytes) @ 0x7f16cd2733d4 [0x7f16cd2719a0+0x1a34]
> J 51766 C2 
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.snapshotPartitionedState(JJ)Ljava/util/HashMap;
>  (40 bytes) @ 0x7f16cb40a1fc [0x7f16cb40a1a0+0x5c]
> J 50547 C2 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotOperatorState(JJ)Lorg/apache/flink/streaming/runtime/tasks/StreamTaskState;
>  (206 bytes) @ 0x7f16cb8be89c [0x7f16cb8be7e0+0xbc]
> J 52232 C2 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(JJ)Z 
> (650 bytes) @ 0x7f16cbfbbf60 [0x7f16cbfbb540+0xa20]
> J 52419 C2 
> org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyChe

Cassandra Sink version

2017-03-22 Thread Nancy Estrada
Hi all,

Which Cassandra version is currently supported by the Flink 1.2 connector?
and I am wondering which version Flink 1.3 will be supporting?

Thank you!



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Cassandra-Sink-version-tp12329.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: RocksDB segfaults

2017-03-22 Thread Stefan Richter
Hi,

for the first checkpoint, from the stacktrace I assume that the backend is not 
accessed as part of processing an element, but by another thread. Is that 
correct? RocksDB requires accessing threads to hold the task’s checkpointing 
lock, otherwise they might call methods on an instance that is already 
disposed. However, this should only happen when the task was already about to 
shutdown anyways. Is that a plausible explanation for your observed behaviour? 
I can also not rule out that segfaults can happen inside RocksDB or due to the 
JNI bridge.

Best,
Stefan

> Am 22.03.2017 um 16:53 schrieb Florian König :
> 
> Hi Stephen,
> 
> you are right, the second stack trace is indeed from a run of Flink 1.1.4. 
> Sorry, my bad.
> 
> That leaves us with the first trace of a segfault for which I can guarantee 
> that it brought down a 1.2.0 instance. Unfortunately I cannot reproduce the 
> problem. It has happened twice so far, but I can’t see any pattern. Is there 
> anything in the stack trace that could point us to a probable cause?
> 
> Florian
> 
>> Am 22.03.2017 um 16:00 schrieb Stephan Ewen :
>> 
>> Hi!
>> 
>> It looks like you are running the RocksDB state backend 1.1 (is still an old 
>> version packaged into your JAR file?)
>> 
>> This line indicates that: 
>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.performSemiAsyncSnapshot
>>  (the method does not exist in 1.2 any more)
>> 
>> Can you try and run 1.2 and see if that still occurs? In general, I cannot 
>> vouch 100% for RocksDBs JNI API, but in our 1.2 tests so far it was stable.
>> 
>> Stephan
>> 
>> 
>> 
>> On Wed, Mar 22, 2017 at 3:13 PM, Florian König  
>> wrote:
>> Hi,
>> 
>> I have experienced two crashes of Flink 1.2 by SIGSEGV, caused by something 
>> in RocksDB. What is the preferred way to report them? All I got at the 
>> moment are two hs_err_pid12345.log files. They are over 4000 lines long 
>> each. Is there anything significant that I should extract to help you guys 
>> and/or put into a JIRA ticket?
>> 
>> The first thing that came to my mind was the stack traces (see below). 
>> Anything else?
>> 
>> Thanks
>> Florian
>> 
>> 
>> 
>> Stack: [0x7fec04341000,0x7fec04442000],  sp=0x7fec0443ff48,  
>> free space=1019k
>> Java frames: (J=compiled Java code, j=interpreted, Vv=VM code)
>> J 10252  org.rocksdb.RocksDB.get(J[BIJ)[B (0 bytes) @ 0x7fec925887cc 
>> [0x7fec92588780+0x4c]
>> J 27241 C2 
>> org.apache.flink.contrib.streaming.state.RocksDBValueState.value()Ljava/lang/Object;
>>  (78 bytes) @ 0x7fec94010ca4 [0x7fec940109c0+0x2e4]
>> j  com.micardo.backend.TransitionProcessor$2.getValue()Ljava/lang/Long;+7
>> j  com.micardo.backend.TransitionProcessor$2.getValue()Ljava/lang/Object;+1
>> J 38483 C2 
>> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(Ljava/io/DataOutput;Lorg/apache/flink/runtime/metrics/dump/QueryScopeInfo;Ljava/lang/String;Lorg/apache/flink/metrics/Gauge;)V
>>  (114 bytes) @ 0x7fec918eabf0 [0x7fec918eabc0+0x30]
>> J 38522 C2 
>> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(Ljava/util/Map;Ljava/util/Map;Ljava/util/Map;Ljava/util/Map;)Lorg/apache/flink/runtime/metrics/dump/MetricDumpSerialization$MetricSerializationResult;
>>  (471 bytes) @ 0x7fec94eb6260 [0x7fec94eb57a0+0xac0]
>> J 47531 C2 
>> org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(Ljava/lang/Object;)V
>>  (453 bytes) @ 0x7fec95ca57a0 [0x7fec95ca4da0+0xa00]
>> J 5815 C2 
>> akka.actor.UntypedActor.aroundReceive(Lscala/PartialFunction;Ljava/lang/Object;)V
>>  (7 bytes) @ 0x7fec91e3ae6c [0x7fec91e3adc0+0xac]
>> J 5410 C2 akka.actor.ActorCell.invoke(Lakka/dispatch/Envelope;)V (104 bytes) 
>> @ 0x7fec91d5bc44 [0x7fec91d5b9a0+0x2a4]
>> J 6628 C2 
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(Lscala/concurrent/forkjoin/ForkJoinTask;)V
>>  (60 bytes) @ 0x7fec9212d050 [0x7fec9212ccc0+0x390]
>> J 40130 C2 scala.concurrent.forkjoin.ForkJoinWorkerThread.run()V (182 bytes) 
>> @ 0x7fec923f8170 [0x7fec923f7fc0+0x1b0]
>> v  ~StubRoutines::call_stub
>> 
>> --
>> 
>> Stack: [0x7f167a5b7000,0x7f167a6b8000],  sp=0x7f167a6b5f40,  
>> free space=1019k
>> Native frames: (J=compiled Java code, j=interpreted, Vv=VM code, C=native 
>> code)
>> C  [libstdc++.so.6+0xc026b]  std::basic_string, 
>> std::allocator >::basic_string(std::string const&)+0xb
>> C  [librocksdbjni8426686507832168508.so+0x2f14ca]  
>> rocksdb::BackupEngine::Open(rocksdb::Env*, rocksdb::BackupableDBOptions 
>> const&, rocksdb::BackupEngine**)+0x3a
>> C  [librocksdbjni8426686507832168508.so+0x180ad5]  
>> Java_org_rocksdb_BackupEngine_open+0x25
>> J 50030  org.rocksdb.BackupEngine.open(JJ)J (0 bytes) @ 0x7f16cb79aa36 
>> [0x7f16cb79a980+0xb6]
>> J 49809 C1 
>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.performSemiAsyncSnapsh

Re: Cassandra Sink version

2017-03-22 Thread Kostas Kloudas
Hi Nancy,

For both Flink 1.2 and Flink 1.3, our tests are written against Cassandra 2.2.5.
We use the version 3.0 of this 
https://github.com/datastax/java-driver/tree/3.0.x 
 driver.
So please check there to see which Cassandra versions they support.

I also include Chesnay in this, to see if he has anything to add.

Thanks,
Kostas

> On Mar 22, 2017, at 5:32 PM, Nancy Estrada  wrote:
> 
> Hi all,
> 
> Which Cassandra version is currently supported by the Flink 1.2 connector?
> and I am wondering which version Flink 1.3 will be supporting?
> 
> Thank you!
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Cassandra-Sink-version-tp12329.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.



Kinesis connector SHARD_GETRECORDS_MAX default value

2017-03-22 Thread Steffen Hausmann

Hi there,

I recently ran into problems with a Flink job running on an EMR cluster 
consuming events from a Kinesis stream receiving roughly 15k 
event/second. Although the EMR cluster was substantially scaled and CPU 
utilization and system load were well below any alarming threshold, the 
processing of events of the stream increasingly fell behind.


Eventually, it turned out that the SHARD_GETRECORDS_MAX defaults to 100 
which is apparently causing too much overhead when consuming events from 
the stream. Increasing the value to 5000, a single GetRecords call to 
Kinesis can retrieve up to 10k records, made the problem go away.


I wonder why the default value for SHARD_GETRECORDS_MAX is chosen so low 
(100x less than it could be). The Kinesis Client Library defaults to 
5000 and it's recommended to use this default value: 
http://docs.aws.amazon.com/streams/latest/dev/troubleshooting-consumers.html#consumer-app-reading-slower.


Thanks for the clarification!

Cheers,
Steffen


Re: Kinesis connector SHARD_GETRECORDS_MAX default value

2017-03-22 Thread Tzu-Li (Gordon) Tai
Hi Steffan,

I have to admit that I didn’t put too much thoughts in the default values for 
the Kinesis consumer.

I’d say it would be reasonable to change the default values to follow KCL’s 
settings. Could you file a JIRA for this?

In general, we might want to reconsider all the default values for configs 
related to the getRecords call, i.e.
- SHARD_GETRECORDS_MAX
- SHARD_GETRECORDS_INTERVAL_MILLIS
- SHARD_GETRECORDS_BACKOFF_*

Cheers,
Gordon

On March 23, 2017 at 2:12:32 AM, Steffen Hausmann (stef...@hausmann-family.de) 
wrote:

Hi there,  

I recently ran into problems with a Flink job running on an EMR cluster  
consuming events from a Kinesis stream receiving roughly 15k  
event/second. Although the EMR cluster was substantially scaled and CPU  
utilization and system load were well below any alarming threshold, the  
processing of events of the stream increasingly fell behind.  

Eventually, it turned out that the SHARD_GETRECORDS_MAX defaults to 100  
which is apparently causing too much overhead when consuming events from  
the stream. Increasing the value to 5000, a single GetRecords call to  
Kinesis can retrieve up to 10k records, made the problem go away.  

I wonder why the default value for SHARD_GETRECORDS_MAX is chosen so low  
(100x less than it could be). The Kinesis Client Library defaults to  
5000 and it's recommended to use this default value:  
http://docs.aws.amazon.com/streams/latest/dev/troubleshooting-consumers.html#consumer-app-reading-slower.
  

Thanks for the clarification!  

Cheers,  
Steffen  


RE: [E] Re: unable to add more servers in zookeeper quorum peers in flink 1.2

2017-03-22 Thread kanagaraj . vengidasamy
That worked.. Thanks Chesnay,


[Verizon]

Kanagaraj Vengidasamy
RTCI

7701 E Telecom PKWY
Temple Terrace, FL 33637

O 813.978.4372 | M 813.455.9757

  [Twitter]    
[LinkedIn] 

From: Chesnay Schepler [mailto:ches...@apache.org]
Sent: Wednesday, March 22, 2017 11:23 AM
To: user@flink.apache.org
Subject: Re: [E] Re: unable to add more servers in zookeeper quorum peers in 
flink 1.2

I guess that's because the grouping is wrong.

^server\.([0-9])+[[:space:]]*\=([^: \#]+)
should probably be

^server\.([0-9]+)[[:space:]]*\=([^: \#]+)
Could you modify the .sh script as such and try again?

Regards,
Chesnay

On 22.03.2017 16:10, 
kanagaraj.vengidas...@verizon.com 
wrote:
Greg,

Sorry about that.. when I copy the config and replaced the real server names . 
I don't have space in my configurations. The  issue is when I have server.10=, 
server.11=
And tried to start the zookeeper,  Myid replaced with 0 and 1 for these servers 
which is conflicting with server.0 and server.1


Thanks

[Verizon]

Kanagaraj Vengidasamy
RTCI

7701 E Telecom PKWY
Temple Terrace, FL 33637

O 813.978.4372 | M 813.455.9757


  [Twitter] 

   [LinkedIn] 


From: Greg Hogan [mailto:c...@greghogan.com]
Sent: Wednesday, March 22, 2017 10:08 AM
To: user@flink.apache.org
Subject: [E] Re: unable to add more servers in zookeeper quorum peers in flink 
1.2

Kanagaraj,

None of the server lines are matching since the regex in 
start-zookeeper-quorum.sh does not allow for spaces after the equals sign.
  ^server\.([0-9])+[[:space:]]*\=([^: \#]+)

Greg

On Mar 22, 2017, at 8:49 AM, 
kanagaraj.vengidas...@verizon.com 
wrote:

Hi All,

We are using flink 1.2  .  Unable to add more than one digit in server.x.  when 
I have more than one digit it is not allowing to start the zookeeper.  What I 
need to do if I want to keep more servers?

# ZooKeeper quorum peers
server.0=server1:2888:3888
server.1=server2:2888:3888
server.2=server3:2888:3888
server.3=server4:2888:3888
server.4=server5:2888:3888
server.5=server6:2888:3888
server.6=server7:2888:3888
server.7=server8:2888:3888
server.8=server9:2888:3888
server.9=server10:2888:3888
#server.10=server11:2888:3888
#server.11=server12:2888:3888

Thanks
Kanagaraj




Odd error

2017-03-22 Thread Telco Phone

Getting this:
DataStream stream =                env.addSource(new 
FlinkKafkaConsumer08<>("raw", schema, properties)                
).setParallelism(30).flatMap(new RecordSplit()).setParallelism(30).             
           name("Raw splitter").keyBy("id","keyByHelper","schema");
Field expression must be equal to '*' or '_' for non-composite types. 
org.apache.flink.api.common.operators.Keys$ExpressionKeys.(Keys.java:342) 
org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:273) 
com.company.ingest.stream.RawRecord.main(RawRecord.java:38)
I did add a new long compare
    @Override    public int compareTo(SchemaRecord o) {        return 
Long.compare(this.keyByHelper, o.keyByHelper);

I can't seem to get by this error...