[ 
https://issues.apache.org/jira/browse/FLINK-7964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16591513#comment-16591513
 ] 

ASF GitHub Bot commented on FLINK-7964:
---------------------------------------

yanghua commented on a change in pull request #6577: [FLINK-7964] Add Apache 
Kafka 1.0/1.1 connectors
URL: https://github.com/apache/flink/pull/6577#discussion_r212599001
 
 

 ##########
 File path: 
flink-connectors/flink-connector-kafka-1.0/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka10ITCase.java
 ##########
 @@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import 
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.Optional;
+
+/**
+ * IT cases for Kafka 1.0 .
+ */
+public class Kafka10ITCase extends KafkaConsumerTestBase {
 
 Review comment:
   @pnowojski I tried to extend from Kafka011ITCase : 
   
   ```java
   public class Kafka10ITCase extends Kafka011ITCase
   ```
   
   but the prepare method : 
   
   ```java
   @BeforeClass
   public static void prepare() throws ClassNotFoundException {
       KafkaProducerTestBase.prepare();    //here
       ((KafkaTestEnvironmentImpl) 
kafkaServer).setProducerSemantic(FlinkKafkaProducer011.Semantic.AT_LEAST_ONCE);
   }
   ```
   will trigger the method call chain : 
   
   ```java
   protected static void startClusters(boolean secureMode, boolean 
hideKafkaBehindProxy) throws ClassNotFoundException {
   
       // dynamically load the implementation for the test
       //here will load KafkaTestEnvironmentImpl from connector 0.11, not 1.0
       Class<?> clazz = 
Class.forName("org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl");
    
       kafkaServer = (KafkaTestEnvironment) 
InstantiationUtil.instantiate(clazz);
   
       LOG.info("Starting KafkaTestBase.prepare() for Kafka " + 
kafkaServer.getVersion());
   ```
   
   the `startClusters` is a static class, the `clazz` is the instance of 
KafkaTestEnvironmentImpl from 0.11 module, and we can not use polymorphism to 
get a instance of KafkaTestEnvironmentImpl from 1.0.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add Apache Kafka 1.0/1.1 connectors
> -----------------------------------
>
>                 Key: FLINK-7964
>                 URL: https://issues.apache.org/jira/browse/FLINK-7964
>             Project: Flink
>          Issue Type: Improvement
>          Components: Kafka Connector
>    Affects Versions: 1.4.0
>            Reporter: Hai Zhou
>            Assignee: vinoyang
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.7.0
>
>
> Kafka 1.0.0 is no mere bump of the version number. The Apache Kafka Project 
> Management Committee has packed a number of valuable enhancements into the 
> release. Here is a summary of a few of them:
> * Since its introduction in version 0.10, the Streams API has become hugely 
> popular among Kafka users, including the likes of Pinterest, Rabobank, 
> Zalando, and The New York Times. In 1.0, the the API continues to evolve at a 
> healthy pace. To begin with, the builder API has been improved (KIP-120). A 
> new API has been added to expose the state of active tasks at runtime 
> (KIP-130). The new cogroup API makes it much easier to deal with partitioned 
> aggregates with fewer StateStores and fewer moving parts in your code 
> (KIP-150). Debuggability gets easier with enhancements to the print() and 
> writeAsText() methods (KIP-160). And if that’s not enough, check out KIP-138 
> and KIP-161 too. For more on streams, check out the Apache Kafka Streams 
> documentation, including some helpful new tutorial videos.
> * Operating Kafka at scale requires that the system remain observable, and to 
> make that easier, we’ve made a number of improvements to metrics. These are 
> too many to summarize without becoming tedious, but Connect metrics have been 
> significantly improved (KIP-196), a litany of new health check metrics are 
> now exposed (KIP-188), and we now have a global topic and partition count 
> (KIP-168). Check out KIP-164 and KIP-187 for even more.
> * We now support Java 9, leading, among other things, to significantly faster 
> TLS and CRC32C implementations. Over-the-wire encryption will be faster now, 
> which will keep Kafka fast and compute costs low when encryption is enabled.
> * In keeping with the security theme, KIP-152 cleans up the error handling on 
> Simple Authentication Security Layer (SASL) authentication attempts. 
> Previously, some authentication error conditions were indistinguishable from 
> broker failures and were not logged in a clear way. This is cleaner now.
> * Kafka can now tolerate disk failures better. Historically, JBOD storage 
> configurations have not been recommended, but the architecture has 
> nevertheless been tempting: after all, why not rely on Kafka’s own 
> replication mechanism to protect against storage failure rather than using 
> RAID? With KIP-112, Kafka now handles disk failure more gracefully. A single 
> disk failure in a JBOD broker will not bring the entire broker down; rather, 
> the broker will continue serving any log files that remain on functioning 
> disks.
> * Since release 0.11.0, the idempotent producer (which is the producer used 
> in the presence of a transaction, which of course is the producer we use for 
> exactly-once processing) required max.in.flight.requests.per.connection to be 
> equal to one. As anyone who has written or tested a wire protocol can attest, 
> this put an upper bound on throughput. Thanks to KAFKA-5949, this can now be 
> as large as five, relaxing the throughput constraint quite a bit.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to