Hi,
I just tried the Kafka 0.10 connector again, and I could not reproduce the
issue you are reporting.

This is my test job:

// parse input arguments
final ParameterTool parameterTool = ParameterTool.fromArgs(args);

if(parameterTool.getNumberOfParameters() < 4) {
   System.out.println("Missing parameters!\nUsage: Kafka --topic <topic> " +
         "--bootstrap.servers <kafka brokers> --zookeeper.connect <zk
quorum> --group.id <some id>");
   return;
}

StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().disableSysoutLogging();
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4,
10000));
env.enableCheckpointing(5000); // create a checkpoint every 5 secodns
env.getConfig().setGlobalJobParameters(parameterTool); // make
parameters available in the web interface

DataStream<String> messageStream = env
      .addSource(new FlinkKafkaConsumer010<>(
            parameterTool.getRequired("topic"),
            new SimpleStringSchema(),
            parameterTool.getProperties()));

// write kafka stream to standard out.
messageStream.print();

env.execute("Read from Kafka example");


On Thu, Nov 3, 2016 at 1:48 PM, Dominik Safaric <dominiksafa...@gmail.com>
wrote:

> Hi Robert,
>
> I think the easiest way to get Kafka 0.10 running with Flink is to use the
> Kafka 0.10 connector in the current Flink master.
>
>
> Well, I’ve already builded the Kafka 0.10 connector from the master, but
> unfortunately I keep getting the error of the type checker that the type of
> the FlinkKafkaConsumer10 and the one required by StreamExecutionEnvironment
> are not compatible - that is, addSource requires a subclass of the
> SourceFunction<T>, whereas the instance of the FlinkKafkaConsumer10 class
> is of type FlinkKafkaConsumer10<T>.
>
> Which I find quite strange because I would assume that the
> FlinkKafkaConsumer instance should be of type SourceFunction. However, the
> same even happened while building the FlinkKafkaConsumer09.
>
> Any hint what might be going on?
>
> I’ve build the jar distribution as a clean maven package (without running
> the tests).
>
> Thanks,
> Dominik
>
> On 3 Nov 2016, at 13:29, Robert Metzger <rmetz...@apache.org> wrote:
>
> Hi Dominik,
>
> Some of Kafka's APIs changed between Kafka 0.9 and 0.10, so you can not
> compile the Kafka 0.9 against Kafka 0.10 dependencies.
>
> I think the easiest way to get Kafka 0.10 running with Flink is to use the
> Kafka 0.10 connector in the current Flink master.
> You can probably copy the connector's code into your own project and use
> the new connector from there.
>
> Regards,
> Robert
>
>
> On Thu, Nov 3, 2016 at 8:05 AM, Dominik Safaric <dominiksafa...@gmail.com>
> wrote:
>
>> Dear all,
>>
>> Although the Flink 1.2 version will rollout a Flink Kafka 0.10.x
>> connector, I want to use the Flink 0.9 connector in conjunction with the
>> 0.10.x versions.
>>
>> The reason behind this is because we are currently evaluating Flink part
>> of an empirical research, hence a stable release is required. In addition,
>> the reason why we have the requirement of using the Kafka 0.10.x versions
>> is because since the 0.10.0 Kafka supports consumer and producer
>> interceptors and message timestamps.
>>
>> To make the 0.9 connector support Kafka version e.g. 0.10.0 for example,
>> so far I’ve changed the Flink Kafka 0.9 connector dependency to the
>> required Kafka version and build the project. However, as I imported the
>> jar and added the source to the StreamExecutionEnvironment a type error
>> occurred stating that the addSource function requires a class deriving from
>> the SourceFunction interface.
>>
>> *Hence, what have gone wrong during the build?* I assume a dependency
>> issue.
>>
>> Next, I’ve tried just simply overriding the dependencies of the Flink
>> Kafka connector within the project pom.xml, however there is obviously a
>> slight API mismatch hence this cannot be done.
>>
>> I would really appreciate if anyone could provide some guidance once how
>> to successfully build the Flink Kafka connector supporting Kafka 0.10.x
>> versions.
>>
>> Thanks in advance,
>> Dominik
>>
>
>
>

Reply via email to