Hi Niclas, Glad that you got it working! Thanks for sharing the problem and solution.
Best, Fabian 2018-02-19 9:29 GMT+01:00 Niclas Hedhman <nic...@apache.org>: > > (Sorry for the incoherent order and ramblings. I am writing this as I am > trying to sort out what is going on...) > > 1. It is the first message to be processed in the Kafka topic. If I set > the offset manually, it will pick up the message at that point, process it, > and ignore all following messages. > > 2. Yes, the Kafka console consumer tool is spitting out the messages > without problem. Btw, they are plain Strings, well, serialized JSON objects. > > 3. Code is a bit messy, but I have copied out the relevant parts below. > > I also noticed that a LOT of exceptions are thrown ("breakpoint on any > exception"), mostly ClassCastException, classes not found and > NoSuchMethodException, but nothing that bubbles up out of the internals. Is > this part of Scala raping the JVM, or just the normal JVM class loading > sequence (no wonder it is so slow)? Is that expected? > > I have tried to use both the ObjectMapper from Jackson proper, as well as > the shadowed ObjectMapper in flink. No difference. > > Recap; Positioning Kafka consumer to message 8th from the last. Only that > message is consumed, the remaining 7 are ignored/swallowed. > > > Ok, so I think I have traced this down to something happening in the > CassandraSink. There is a Exception being thrown somewhere, which I see as > the Kafka09Fetcher.runFetchLoop()'s finally clause is called. > > Found it (hours later in debugging), on this line (Flink 1.4.1) > > org/apache/flink/cassandra/shaded/com/google/common/util/concurrent/Futures.class:258 > > which contains > > future.addListener(callbackListener, executor); // IDEA says 'future' is > of type DefaultResultSetFuture > > throws an Exception without stepping into the addListener() method. There > is nothing catching the Exception (and I don't want to go down the rabbit > hole of building from source), so I can't really say what Exception is > being thrown. IDEA doesn't seem to report it, and the catch clauses in > OperatorChain.pushToOperator() (ClassCastException and Exception) are in > the call stack, but doesn't catch it, which could suggest an > java.lang.Error, and NoClassDefFoundError comes to mind, since there are SO > MANY classloading exception going on all the time. > > Hold on a second... There are TWO > com.datastax.driver.core.DefaultResultSetFuture > types in the classpath. One from the Cassandra client that I declared, and > on from inside the flink-connector-cassandra_2.11 artifact... > > So will it work if I remove my own dependency declaration and that's it? > > > YEEEEESSSSS!!! FInally..... > > > SOLVED! > > -o-o-o-o-o- > > public static void main( String[] args ) > throws Exception > { > cli = CommandLine.populateCommand( new ServerCliOptions(), args ); > initializeCassandra( cli ); > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment().setMaxParallelism( 32768 > ); > // createPollDocPipeline( env ); > createAdminPipeline( env ); > env.execute( "schedule.poll" ); > } > > > > private static void createAdminPipeline( StreamExecutionEnvironment env ) > { > try > { > FlinkKafkaConsumer011<String> adminSource = createKafkaAdminSource(); > SplitStream<AdminCommand> adminStream = > env.addSource( adminSource ) > .name( "scheduler.admin" ) > .map( value -> { > try > { > return mapper.readValue( value, AdminCommand.class ); > } > catch( Throwable e ) > { > LOG.error( "Unexpected error deserializing > AdminCommand", e ); > return null; > } > } ) > .name( "admin.command.read" ) > .split( value -> singletonList( value.action() ) ); > > SingleOutputStreamOperator<Tuple3<List<String>, String, String>> > insertStream = > adminStream.select( AdminCommand.CMD_SCHEDULE_INSERT ) > .map( new GetPollDeclaration() ) > .name( "scheduler.admin.insert" ) > .map( new PollDeclarationToTuple3Map() ) > .name( "scheduler.pollDeclToTuple3" ) > .filter( tuple -> tuple != null ); > > SingleOutputStreamOperator<Tuple3<List<String>, String, String>> > deleteStream = > adminStream.select( AdminCommand.CMD_SCHEDULE_DELETE ) > .map( new GetPollDeclaration() ) > .name( "scheduler.admin.delete" ) > .map( new PollDeclarationToTuple3Map() ) > .name( "scheduler.pollDeclToTuple3" ) > .filter( tuple -> tuple != null ); > > CassandraSink.addSink( insertStream ) > .setHost( cli.primaryCassandraHost(), > cli.primaryCassandraPort() ) > .setQuery( String.format( INSERT_SCHEDULE, > cli.cassandraKeyspace ) ) > .build(); > > CassandraSink.addSink( deleteStream ) > .setHost( cli.primaryCassandraHost(), > cli.primaryCassandraPort() ) > .setQuery( String.format( DELETE_SCHEDULE, > cli.cassandraKeyspace ) ) > .build(); > } > catch( Throwable e ) > { > String message = "Unable to start Scheduling Admin"; > LOG.error( message ); > throw new RuntimeException( message, e ); > } > } > > > private static class GetPollDeclaration > implements MapFunction<AdminCommand, PollDeclaration> > { > private static final Logger LOG = LoggerFactory.getLogger( > GetPollDeclaration.class ); > > @Override > public PollDeclaration map( AdminCommand command ) > throws Exception > { > try > { > if( command == null ) > { > return null; > } > return (PollDeclaration) command.value(); > } > catch( Throwable e ) > { > LOG.error( "Unable to cast command data to PollDeclaration", e ); > return null; > } > } > } > > > private static class PollDeclarationToTuple3Map > implements MapFunction<PollDeclaration, Tuple3<List<String>, String, > String>> > { > @Override > public Tuple3<List<String>, String, String> map( PollDeclaration decl ) > throws Exception > { > try > { > if( decl == null ) > { > return null; > } > return new Tuple3<>( singletonList( mapper.writeValueAsString( > decl ) ), decl.zoneId + ":" + decl.schedule, decl.url ); > } > catch( Throwable e ) > { > LOG.error( "Unable to cast command data to PollDeclaration", e ); > return null; > } > } > } > > Flink Dependencies; > > flink : [ > [group: "org.apache.flink", name: "flink-core", version: > flinkVersion], > [group: "org.apache.flink", name: "flink-java", version: > flinkVersion], > [group: "org.apache.flink", name: "flink-connector-cassandra_2.11", > version: flinkVersion], > [group: "org.apache.flink", name: "flink-connector-kafka-0.11_2.11", > version: flinkVersion], > [group: "org.apache.flink", name: > "flink-queryable-state-runtime_2.11", version: flinkVersion], > [group: "org.apache.flink", name: "flink-streaming-java_2.11", > version: flinkVersion], > [group: "org.apache.flink", name: "flink-streaming-scala_2.11", > version: flinkVersion] > ], > > > > > > On Sun, Feb 18, 2018 at 8:11 PM, Xingcan Cui <xingc...@gmail.com> wrote: > >> Hi Niclas, >> >> About the second point you mentioned, was the processed message a random >> one or a fixed one? >> >> The default startup mode for FlinkKafkaConsumer is >> StartupMode.GROUP_OFFSETS, maybe you could try StartupMode.EARLIST while >> debugging. Also, before that, you may try fetching the messages with the >> Kafka console consumer tool to see whether they can be consumed completely. >> >> Besides, I wonder if you could provide the code for you Flink pipeline. >> That’ll be helpful. >> >> Best, >> Xingcan >> >> >> >> On 18 Feb 2018, at 7:52 PM, Niclas Hedhman <nic...@apache.org> wrote: >> >> >> So, the producer is run (at the moment) manually (command-line) one >> message at a time. >> Kafka's tooling (different consumer group) shows that a message is added >> each time. >> >> Since my last post, I have also added a UUID as the key, and that didn't >> make a difference, so you are likely correct about de-dup. >> >> >> There is only a single partition on the topic, so it shouldn't be a >> partitioning issue. >> >> I also noticed; >> 1. Sending a message while consumer topology is running, after the first >> message, then that message will be processed after a restart. >> >> 2. Sending many messages, while consumer is running, and then doing many >> restarts will only process a single of those. No idea what happens to the >> others. >> >> I am utterly confused. >> >> And digging in the internals are not for the faint-hearted, but the >> kafka.poll() returns frequently with empty records. >> >> Will continue debugging that tomorrow... >> >> >> Niclas >> >> On Feb 18, 2018 18:50, "Fabian Hueske" <fhue...@gmail.com> wrote: >> >>> Hi Niclas, >>> >>> Flink's Kafka consumer should not apply any deduplication. AFAIK, such a >>> "feature" is not implemented. >>> Do you produce into the topic that you want to read or is the data in >>> the topic static? >>> If you do not produce in the topic while the consuming application is >>> running, this might be an issue with the start position of the consumer >>> [1]. >>> >>> Best, Fabian >>> >>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/ >>> dev/connectors/kafka.html#kafka-consumers-start-position-configuration >>> >>> 2018-02-18 8:14 GMT+01:00 Niclas Hedhman <nic...@apache.org>: >>> >>>> Hi, >>>> I am pretty new to Flink, and I like what I see and have started to >>>> build my first application using it. >>>> I must be missing something very fundamental. I have a >>>> FlinkKafkaConsumer011, followed by a handful of filter, map and flatMap >>>> functions and terminated with the standard CassandraSink. I have try..catch >>>> on all my own maps/filters and the first message in the queue is processed >>>> after start-up, but any additional messages are ignore, i.e. not reaching >>>> the first map(). Any additional messages are swallowed (i.e. consumed but >>>> not forwarded). >>>> >>>> I suspect that this is some type of de-duplication going on, since the >>>> (test) producer of these messages. The producer provide different values on >>>> each, but there is no "key" being passed to the KafkaProducer. >>>> >>>> Is that required? And if so, why? Can I tell Flink or Flink's >>>> KafkaConsumer to ingest all messages, and not try to de-duplicate them? >>>> >>>> Thanks >>>> >>>> -- >>>> Niclas Hedhman, Software Developer >>>> http://zest.apache.org - New Energy for Java >>>> >>> >>> >> > > > -- > Niclas Hedhman, Software Developer > http://zest.apache.org - New Energy for Java >