http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ProducerITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ProducerITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ProducerITCase.java deleted file mode 100644 index 42b9682..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ProducerITCase.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka; - - -import org.junit.Test; - - -@SuppressWarnings("serial") -public class Kafka010ProducerITCase extends KafkaProducerTestBase { - - @Test - public void testCustomPartitioning() { - runCustomPartitioningTest(); - } - -}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java deleted file mode 100644 index f15fd45..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java +++ /dev/null @@ -1,420 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.streaming.connectors.kafka; - -import kafka.admin.AdminUtils; -import kafka.common.KafkaException; -import kafka.server.KafkaConfig; -import kafka.server.KafkaServer; -import kafka.utils.SystemTime$; -import kafka.utils.ZkUtils; -import org.I0Itec.zkclient.ZkClient; -import org.apache.commons.io.FileUtils; -import org.apache.curator.test.TestingServer; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.DataStreamSink; -import org.apache.flink.streaming.api.operators.StreamSink; -import org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer; -import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; -import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; -import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; -import org.apache.flink.util.NetUtils; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.protocol.SecurityProtocol; -import org.apache.kafka.common.requests.MetadataResponse; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.net.BindException; -import java.util.ArrayList; -import java.util.List; -import java.util.Properties; -import java.util.UUID; - -import static org.apache.flink.util.NetUtils.hostAndPortToUrlString; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -/** - * An implementation of the KafkaServerProvider for Kafka 0.10 - */ -public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { - - protected static final Logger LOG = LoggerFactory.getLogger(KafkaTestEnvironmentImpl.class); - private File tmpZkDir; - private File tmpKafkaParent; - private List<File> tmpKafkaDirs; - private List<KafkaServer> brokers; - private TestingServer zookeeper; - private String zookeeperConnectionString; - private String brokerConnectionString = ""; - private Properties standardProps; - private Properties additionalServerProperties; - private boolean secureMode = false; - // 6 seconds is default. Seems to be too small for travis. 30 seconds - private int zkTimeout = 30000; - - public String getBrokerConnectionString() { - return brokerConnectionString; - } - - @Override - public Properties getStandardProperties() { - return standardProps; - } - - @Override - public Properties getSecureProperties() { - Properties prop = new Properties(); - if(secureMode) { - prop.put("security.inter.broker.protocol", "SASL_PLAINTEXT"); - prop.put("security.protocol", "SASL_PLAINTEXT"); - prop.put("sasl.kerberos.service.name", "kafka"); - - //add special timeout for Travis - prop.setProperty("zookeeper.session.timeout.ms", String.valueOf(zkTimeout)); - prop.setProperty("zookeeper.connection.timeout.ms", String.valueOf(zkTimeout)); - prop.setProperty("metadata.fetch.timeout.ms","120000"); - } - return prop; - } - - @Override - public String getVersion() { - return "0.10"; - } - - @Override - public List<KafkaServer> getBrokers() { - return brokers; - } - - @Override - public <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> topics, KeyedDeserializationSchema<T> readSchema, Properties props) { - return new FlinkKafkaConsumer010<>(topics, readSchema, props); - } - - @Override - public <T> StreamSink<T> getProducerSink(String topic, KeyedSerializationSchema<T> serSchema, Properties props, KafkaPartitioner<T> partitioner) { - FlinkKafkaProducer010<T> prod = new FlinkKafkaProducer010<>(topic, serSchema, props, partitioner); - prod.setFlushOnCheckpoint(true); - return new StreamSink<>(prod); - } - - - @Override - public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props, KafkaPartitioner<T> partitioner) { - FlinkKafkaProducer010<T> prod = new FlinkKafkaProducer010<>(topic, serSchema, props, partitioner); - prod.setFlushOnCheckpoint(true); - return stream.addSink(prod); - } - - @Override - public KafkaOffsetHandler createOffsetHandler(Properties props) { - return new KafkaOffsetHandlerImpl(props); - } - - @Override - public void restartBroker(int leaderId) throws Exception { - brokers.set(leaderId, getKafkaServer(leaderId, tmpKafkaDirs.get(leaderId))); - } - - @Override - public int getLeaderToShutDown(String topic) throws Exception { - ZkUtils zkUtils = getZkUtils(); - try { - MetadataResponse.PartitionMetadata firstPart = null; - do { - if (firstPart != null) { - LOG.info("Unable to find leader. error code {}", firstPart.error().code()); - // not the first try. Sleep a bit - Thread.sleep(150); - } - - List<MetadataResponse.PartitionMetadata> partitionMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkUtils).partitionMetadata(); - firstPart = partitionMetadata.get(0); - } - while (firstPart.error().code() != 0); - - return firstPart.leader().id(); - } finally { - zkUtils.close(); - } - } - - @Override - public int getBrokerId(KafkaServer server) { - return server.config().brokerId(); - } - - @Override - public boolean isSecureRunSupported() { - return true; - } - - @Override - public void prepare(int numKafkaServers, Properties additionalServerProperties, boolean secureMode) { - //increase the timeout since in Travis ZK connection takes long time for secure connection. - if(secureMode) { - //run only one kafka server to avoid multiple ZK connections from many instances - Travis timeout - numKafkaServers = 1; - zkTimeout = zkTimeout * 15; - } - - this.additionalServerProperties = additionalServerProperties; - this.secureMode = secureMode; - File tempDir = new File(System.getProperty("java.io.tmpdir")); - - tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + (UUID.randomUUID().toString())); - assertTrue("cannot create zookeeper temp dir", tmpZkDir.mkdirs()); - - tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir*" + (UUID.randomUUID().toString())); - assertTrue("cannot create kafka temp dir", tmpKafkaParent.mkdirs()); - - tmpKafkaDirs = new ArrayList<>(numKafkaServers); - for (int i = 0; i < numKafkaServers; i++) { - File tmpDir = new File(tmpKafkaParent, "server-" + i); - assertTrue("cannot create kafka temp dir", tmpDir.mkdir()); - tmpKafkaDirs.add(tmpDir); - } - - zookeeper = null; - brokers = null; - - try { - zookeeper = new TestingServer(- 1, tmpZkDir); - zookeeperConnectionString = zookeeper.getConnectString(); - LOG.info("Starting Zookeeper with zookeeperConnectionString: {}", zookeeperConnectionString); - - LOG.info("Starting KafkaServer"); - brokers = new ArrayList<>(numKafkaServers); - - for (int i = 0; i < numKafkaServers; i++) { - brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i))); - - if(secureMode) { - brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.SASL_PLAINTEXT)) + ","; - } else { - brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.PLAINTEXT)) + ","; - } - } - - LOG.info("ZK and KafkaServer started."); - } - catch (Throwable t) { - t.printStackTrace(); - fail("Test setup failed: " + t.getMessage()); - } - - standardProps = new Properties(); - standardProps.setProperty("zookeeper.connect", zookeeperConnectionString); - standardProps.setProperty("bootstrap.servers", brokerConnectionString); - standardProps.setProperty("group.id", "flink-tests"); - standardProps.setProperty("enable.auto.commit", "false"); - standardProps.setProperty("zookeeper.session.timeout.ms", String.valueOf(zkTimeout)); - standardProps.setProperty("zookeeper.connection.timeout.ms", String.valueOf(zkTimeout)); - standardProps.setProperty("auto.offset.reset", "earliest"); // read from the beginning. (earliest is kafka 0.10 value) - standardProps.setProperty("max.partition.fetch.bytes", "256"); // make a lot of fetches (MESSAGES MUST BE SMALLER!) - } - - @Override - public void shutdown() { - for (KafkaServer broker : brokers) { - if (broker != null) { - broker.shutdown(); - } - } - brokers.clear(); - - if (zookeeper != null) { - try { - zookeeper.stop(); - } - catch (Exception e) { - LOG.warn("ZK.stop() failed", e); - } - zookeeper = null; - } - - // clean up the temp spaces - - if (tmpKafkaParent != null && tmpKafkaParent.exists()) { - try { - FileUtils.deleteDirectory(tmpKafkaParent); - } - catch (Exception e) { - // ignore - } - } - if (tmpZkDir != null && tmpZkDir.exists()) { - try { - FileUtils.deleteDirectory(tmpZkDir); - } - catch (Exception e) { - // ignore - } - } - } - - public ZkUtils getZkUtils() { - ZkClient creator = new ZkClient(zookeeperConnectionString, Integer.valueOf(standardProps.getProperty("zookeeper.session.timeout.ms")), - Integer.valueOf(standardProps.getProperty("zookeeper.connection.timeout.ms")), new ZooKeeperStringSerializer()); - return ZkUtils.apply(creator, false); - } - - @Override - public void createTestTopic(String topic, int numberOfPartitions, int replicationFactor, Properties topicConfig) { - // create topic with one client - LOG.info("Creating topic {}", topic); - - ZkUtils zkUtils = getZkUtils(); - try { - AdminUtils.createTopic(zkUtils, topic, numberOfPartitions, replicationFactor, topicConfig, kafka.admin.RackAwareMode.Enforced$.MODULE$); - } finally { - zkUtils.close(); - } - - // validate that the topic has been created - final long deadline = System.currentTimeMillis() + 30000; - do { - try { - if(secureMode) { - //increase wait time since in Travis ZK timeout occurs frequently - int wait = zkTimeout / 100; - LOG.info("waiting for {} msecs before the topic {} can be checked", wait, topic); - Thread.sleep(wait); - } else { - Thread.sleep(100); - } - } catch (InterruptedException e) { - // restore interrupted state - } - // we could use AdminUtils.topicExists(zkUtils, topic) here, but it's results are - // not always correct. - - // create a new ZK utils connection - ZkUtils checkZKConn = getZkUtils(); - if(AdminUtils.topicExists(checkZKConn, topic)) { - checkZKConn.close(); - return; - } - checkZKConn.close(); - } - while (System.currentTimeMillis() < deadline); - fail("Test topic could not be created"); - } - - @Override - public void deleteTestTopic(String topic) { - ZkUtils zkUtils = getZkUtils(); - try { - LOG.info("Deleting topic {}", topic); - - ZkClient zk = new ZkClient(zookeeperConnectionString, Integer.valueOf(standardProps.getProperty("zookeeper.session.timeout.ms")), - Integer.valueOf(standardProps.getProperty("zookeeper.connection.timeout.ms")), new ZooKeeperStringSerializer()); - - AdminUtils.deleteTopic(zkUtils, topic); - - zk.close(); - } finally { - zkUtils.close(); - } - } - - /** - * Copied from com.github.sakserv.minicluster.KafkaLocalBrokerIntegrationTest (ASL licensed) - */ - protected KafkaServer getKafkaServer(int brokerId, File tmpFolder) throws Exception { - Properties kafkaProperties = new Properties(); - - // properties have to be Strings - kafkaProperties.put("advertised.host.name", KAFKA_HOST); - kafkaProperties.put("broker.id", Integer.toString(brokerId)); - kafkaProperties.put("log.dir", tmpFolder.toString()); - kafkaProperties.put("zookeeper.connect", zookeeperConnectionString); - kafkaProperties.put("message.max.bytes", String.valueOf(50 * 1024 * 1024)); - kafkaProperties.put("replica.fetch.max.bytes", String.valueOf(50 * 1024 * 1024)); - - // for CI stability, increase zookeeper session timeout - kafkaProperties.put("zookeeper.session.timeout.ms", zkTimeout); - kafkaProperties.put("zookeeper.connection.timeout.ms", zkTimeout); - if(additionalServerProperties != null) { - kafkaProperties.putAll(additionalServerProperties); - } - - final int numTries = 5; - - for (int i = 1; i <= numTries; i++) { - int kafkaPort = NetUtils.getAvailablePort(); - kafkaProperties.put("port", Integer.toString(kafkaPort)); - - //to support secure kafka cluster - if(secureMode) { - LOG.info("Adding Kafka secure configurations"); - kafkaProperties.put("listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort); - kafkaProperties.put("advertised.listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort); - kafkaProperties.putAll(getSecureProperties()); - } - - KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties); - - try { - scala.Option<String> stringNone = scala.Option.apply(null); - KafkaServer server = new KafkaServer(kafkaConfig, SystemTime$.MODULE$, stringNone); - server.startup(); - return server; - } - catch (KafkaException e) { - if (e.getCause() instanceof BindException) { - // port conflict, retry... - LOG.info("Port conflict when starting Kafka Broker. Retrying..."); - } - else { - throw e; - } - } - } - - throw new Exception("Could not start Kafka after " + numTries + " retries due to port conflicts."); - } - - private class KafkaOffsetHandlerImpl implements KafkaOffsetHandler { - - private final KafkaConsumer<byte[], byte[]> offsetClient; - - public KafkaOffsetHandlerImpl(Properties props) { - offsetClient = new KafkaConsumer<>(props); - } - - @Override - public Long getCommittedOffset(String topicName, int partition) { - OffsetAndMetadata committed = offsetClient.committed(new TopicPartition(topicName, partition)); - return (committed != null) ? committed.offset() : null; - } - - @Override - public void close() { - offsetClient.close(); - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/resources/log4j-test.properties b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/resources/log4j-test.properties deleted file mode 100644 index fbeb110..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/resources/log4j-test.properties +++ /dev/null @@ -1,30 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - -log4j.rootLogger=INFO, testlogger - -log4j.appender.testlogger=org.apache.log4j.ConsoleAppender -log4j.appender.testlogger.target = System.err -log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout -log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n - -# suppress the irrelevant (wrong) warnings from the netty channel handler -log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger -log4j.logger.org.apache.zookeeper=OFF, testlogger -log4j.logger.state.change.logger=OFF, testlogger -log4j.logger.kafka=OFF, testlogger http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/resources/logback-test.xml ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/resources/logback-test.xml b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/resources/logback-test.xml deleted file mode 100644 index 45b3b92..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/resources/logback-test.xml +++ /dev/null @@ -1,30 +0,0 @@ -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one - ~ or more contributor license agreements. See the NOTICE file - ~ distributed with this work for additional information - ~ regarding copyright ownership. The ASF licenses this file - ~ to you under the Apache License, Version 2.0 (the - ~ "License"); you may not use this file except in compliance - ~ with the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> - -<configuration> - <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> - <encoder> - <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern> - </encoder> - </appender> - - <root level="WARN"> - <appender-ref ref="STDOUT"/> - </root> - <logger name="org.apache.flink.streaming" level="WARN"/> -</configuration> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/pom.xml ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/pom.xml b/flink-streaming-connectors/flink-connector-kafka-0.8/pom.xml deleted file mode 100644 index f17f9ae..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/pom.xml +++ /dev/null @@ -1,219 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.flink</groupId> - <artifactId>flink-streaming-connectors</artifactId> - <version>1.2-SNAPSHOT</version> - <relativePath>..</relativePath> - </parent> - - <artifactId>flink-connector-kafka-0.8_2.10</artifactId> - <name>flink-connector-kafka-0.8</name> - - <packaging>jar</packaging> - - <!-- Allow users to pass custom connector versions --> - <properties> - <kafka.version>0.8.2.2</kafka.version> - </properties> - - <dependencies> - - <!-- core dependencies --> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-streaming-java_2.10</artifactId> - <version>${project.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-shaded-curator-recipes</artifactId> - <version>${project.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-connector-kafka-base_2.10</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-table_2.10</artifactId> - <version>${project.version}</version> - <scope>provided</scope> - <!-- Projects depending on this project, - won't depend on flink-table. --> - <optional>true</optional> - </dependency> - - <dependency> - <groupId>org.apache.kafka</groupId> - <artifactId>kafka_${scala.binary.version}</artifactId> - <version>${kafka.version}</version> - <exclusions> - <exclusion> - <groupId>com.sun.jmx</groupId> - <artifactId>jmxri</artifactId> - </exclusion> - <exclusion> - <groupId>com.sun.jdmk</groupId> - <artifactId>jmxtools</artifactId> - </exclusion> - <exclusion> - <groupId>log4j</groupId> - <artifactId>log4j</artifactId> - </exclusion> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-simple</artifactId> - </exclusion> - <exclusion> - <groupId>net.sf.jopt-simple</groupId> - <artifactId>jopt-simple</artifactId> - </exclusion> - <exclusion> - <groupId>org.scala-lang</groupId> - <artifactId>scala-reflect</artifactId> - </exclusion> - <exclusion> - <groupId>org.scala-lang</groupId> - <artifactId>scala-compiler</artifactId> - </exclusion> - <exclusion> - <groupId>com.yammer.metrics</groupId> - <artifactId>metrics-annotation</artifactId> - </exclusion> - <exclusion> - <groupId>org.xerial.snappy</groupId> - <artifactId>snappy-java</artifactId> - </exclusion> - </exclusions> - </dependency> - - <!-- test dependencies --> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-streaming-java_2.10</artifactId> - <version>${project.version}</version> - <scope>test</scope> - <type>test-jar</type> - </dependency> - - <dependency> - <groupId>org.apache.curator</groupId> - <artifactId>curator-test</artifactId> - <version>${curator.version}</version> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-metrics-jmx</artifactId> - <version>${project.version}</version> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-connector-kafka-base_2.10</artifactId> - <version>${project.version}</version> - <type>test-jar</type> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-runtime_2.10</artifactId> - <version>${project.version}</version> - <type>test-jar</type> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-test-utils_2.10</artifactId> - <version>${project.version}</version> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-tests_2.10</artifactId> - <version>${project.version}</version> - <type>test-jar</type> - <scope>test</scope> - </dependency> - - </dependencies> - - - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-surefire-plugin</artifactId> - <configuration> - <!-- Enforce single fork execution due to heavy mini cluster use in the tests --> - <forkCount>1</forkCount> - </configuration> - </plugin> - <!-- Relocate curator --> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-shade-plugin</artifactId> - <executions> - <execution> - <id>shade-flink</id> - <phase>package</phase> - <goals> - <goal>shade</goal> - </goals> - <configuration> - <artifactSet> - <includes combine.children="append"> - <include>org.apache.flink:flink-shaded-curator-recipes</include> - </includes> - </artifactSet> - <relocations combine.children="append"> - <relocation> - <pattern>org.apache.curator</pattern> - <shadedPattern>org.apache.flink.shaded.org.apache.curator</shadedPattern> - </relocation> - </relocations> - </configuration> - </execution> - </executions> - </plugin> - </plugins> - </build> - -</project> http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java deleted file mode 100644 index 0aacccd..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java +++ /dev/null @@ -1,398 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka; - -import kafka.api.OffsetRequest; -import kafka.cluster.Broker; -import kafka.common.ErrorMapping; -import kafka.javaapi.PartitionMetadata; -import kafka.javaapi.TopicMetadata; -import kafka.javaapi.TopicMetadataRequest; -import kafka.javaapi.consumer.SimpleConsumer; - -import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; -import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; -import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; -import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher; -import org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher; -import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; -import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionLeader; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; -import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; -import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; -import org.apache.flink.util.NetUtils; -import org.apache.flink.util.PropertiesUtil; -import org.apache.flink.util.SerializedValue; - -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.common.Node; - -import java.net.InetAddress; -import java.net.URL; -import java.net.UnknownHostException; -import java.nio.channels.ClosedChannelException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Properties; -import java.util.Random; - -import static org.apache.flink.util.PropertiesUtil.getInt; -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * The Flink Kafka Consumer is a streaming data source that pulls a parallel data stream from - * Apache Kafka 0.8.x. The consumer can run in multiple parallel instances, each of which will pull - * data from one or more Kafka partitions. - * - * <p>The Flink Kafka Consumer participates in checkpointing and guarantees that no data is lost - * during a failure, and that the computation processes elements "exactly once". - * (Note: These guarantees naturally assume that Kafka itself does not loose any data.)</p> - * - * <p>Flink's Kafka Consumer is designed to be compatible with Kafka's High-Level Consumer API (0.8.x). - * Most of Kafka's configuration variables can be used with this consumer as well: - * <ul> - * <li>socket.timeout.ms</li> - * <li>socket.receive.buffer.bytes</li> - * <li>fetch.message.max.bytes</li> - * <li>auto.offset.reset with the values "largest", "smallest"</li> - * <li>fetch.wait.max.ms</li> - * </ul> - * </li> - * </ul> - * - * <h1>Offset handling</h1> - * - * <p>Offsets whose records have been read and are checkpointed will be committed back to ZooKeeper - * by the offset handler. In addition, the offset handler finds the point where the source initially - * starts reading from the stream, when the streaming job is started.</p> - * - * <p>Please note that Flink snapshots the offsets internally as part of its distributed checkpoints. The offsets - * committed to Kafka / ZooKeeper are only to bring the outside view of progress in sync with Flink's view - * of the progress. That way, monitoring and other jobs can get a view of how far the Flink Kafka consumer - * has consumed a topic.</p> - * - * <p>If checkpointing is disabled, the consumer will periodically commit the current offset - * to Zookeeper.</p> - * - * <p>When using a Kafka topic to send data between Flink jobs, we recommend using the - * {@see TypeInformationSerializationSchema} and {@see TypeInformationKeyValueSerializationSchema}.</p> - * - * <p><b>NOTE:</b> The implementation currently accesses partition metadata when the consumer - * is constructed. That means that the client that submits the program needs to be able to - * reach the Kafka brokers or ZooKeeper.</p> - */ -public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> { - - private static final long serialVersionUID = -6272159445203409112L; - - /** Configuration key for the number of retries for getting the partition info */ - public static final String GET_PARTITIONS_RETRIES_KEY = "flink.get-partitions.retry"; - - /** Default number of retries for getting the partition info. One retry means going through the full list of brokers */ - public static final int DEFAULT_GET_PARTITIONS_RETRIES = 3; - - // ------------------------------------------------------------------------ - - /** The properties to parametrize the Kafka consumer and ZooKeeper client */ - private final Properties kafkaProperties; - - /** The behavior when encountering an invalid offset (see {@link OffsetRequest}) */ - private final long invalidOffsetBehavior; - - /** The interval in which to automatically commit (-1 if deactivated) */ - private final long autoCommitInterval; - - // ------------------------------------------------------------------------ - - /** - * Creates a new Kafka streaming source consumer for Kafka 0.8.x - * - * @param topic - * The name of the topic that should be consumed. - * @param valueDeserializer - * The de-/serializer used to convert between Kafka's byte messages and Flink's objects. - * @param props - * The properties used to configure the Kafka consumer client, and the ZooKeeper client. - */ - public FlinkKafkaConsumer08(String topic, DeserializationSchema<T> valueDeserializer, Properties props) { - this(Collections.singletonList(topic), valueDeserializer, props); - } - - /** - * Creates a new Kafka streaming source consumer for Kafka 0.8.x - * - * This constructor allows passing a {@see KeyedDeserializationSchema} for reading key/value - * pairs, offsets, and topic names from Kafka. - * - * @param topic - * The name of the topic that should be consumed. - * @param deserializer - * The keyed de-/serializer used to convert between Kafka's byte messages and Flink's objects. - * @param props - * The properties used to configure the Kafka consumer client, and the ZooKeeper client. - */ - public FlinkKafkaConsumer08(String topic, KeyedDeserializationSchema<T> deserializer, Properties props) { - this(Collections.singletonList(topic), deserializer, props); - } - - /** - * Creates a new Kafka streaming source consumer for Kafka 0.8.x - * - * This constructor allows passing multiple topics to the consumer. - * - * @param topics - * The Kafka topics to read from. - * @param deserializer - * The de-/serializer used to convert between Kafka's byte messages and Flink's objects. - * @param props - * The properties that are used to configure both the fetcher and the offset handler. - */ - public FlinkKafkaConsumer08(List<String> topics, DeserializationSchema<T> deserializer, Properties props) { - this(topics, new KeyedDeserializationSchemaWrapper<>(deserializer), props); - } - - /** - * Creates a new Kafka streaming source consumer for Kafka 0.8.x - * - * This constructor allows passing multiple topics and a key/value deserialization schema. - * - * @param topics - * The Kafka topics to read from. - * @param deserializer - * The keyed de-/serializer used to convert between Kafka's byte messages and Flink's objects. - * @param props - * The properties that are used to configure both the fetcher and the offset handler. - */ - public FlinkKafkaConsumer08(List<String> topics, KeyedDeserializationSchema<T> deserializer, Properties props) { - super(topics, deserializer); - - checkNotNull(topics, "topics"); - this.kafkaProperties = checkNotNull(props, "props"); - - // validate the zookeeper properties - validateZooKeeperConfig(props); - - this.invalidOffsetBehavior = getInvalidOffsetBehavior(props); - this.autoCommitInterval = PropertiesUtil.getLong(props, "auto.commit.interval.ms", 60000); - } - - @Override - protected AbstractFetcher<T, ?> createFetcher( - SourceContext<T> sourceContext, - List<KafkaTopicPartition> thisSubtaskPartitions, - SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, - SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, - StreamingRuntimeContext runtimeContext) throws Exception { - - boolean useMetrics = !Boolean.valueOf(kafkaProperties.getProperty(KEY_DISABLE_METRICS, "false")); - - return new Kafka08Fetcher<>(sourceContext, thisSubtaskPartitions, - watermarksPeriodic, watermarksPunctuated, - runtimeContext, deserializer, kafkaProperties, - invalidOffsetBehavior, autoCommitInterval, useMetrics); - } - - @Override - protected List<KafkaTopicPartition> getKafkaPartitions(List<String> topics) { - // Connect to a broker to get the partitions for all topics - List<KafkaTopicPartition> partitionInfos = - KafkaTopicPartition.dropLeaderData(getPartitionsForTopic(topics, kafkaProperties)); - - if (partitionInfos.size() == 0) { - throw new RuntimeException( - "Unable to retrieve any partitions for the requested topics " + topics + - ". Please check previous log entries"); - } - - if (LOG.isInfoEnabled()) { - logPartitionInfo(LOG, partitionInfos); - } - - return partitionInfos; - } - - // ------------------------------------------------------------------------ - // Kafka / ZooKeeper communication utilities - // ------------------------------------------------------------------------ - - /** - * Send request to Kafka to get partitions for topic. - * - * @param topics The name of the topics. - * @param properties The properties for the Kafka Consumer that is used to query the partitions for the topic. - */ - public static List<KafkaTopicPartitionLeader> getPartitionsForTopic(List<String> topics, Properties properties) { - String seedBrokersConfString = properties.getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG); - final int numRetries = getInt(properties, GET_PARTITIONS_RETRIES_KEY, DEFAULT_GET_PARTITIONS_RETRIES); - - checkNotNull(seedBrokersConfString, "Configuration property %s not set", ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG); - String[] seedBrokers = seedBrokersConfString.split(","); - List<KafkaTopicPartitionLeader> partitions = new ArrayList<>(); - - final String clientId = "flink-kafka-consumer-partition-lookup"; - final int soTimeout = getInt(properties, "socket.timeout.ms", 30000); - final int bufferSize = getInt(properties, "socket.receive.buffer.bytes", 65536); - - Random rnd = new Random(); - retryLoop: for (int retry = 0; retry < numRetries; retry++) { - // we pick a seed broker randomly to avoid overloading the first broker with all the requests when the - // parallel source instances start. Still, we try all available brokers. - int index = rnd.nextInt(seedBrokers.length); - brokersLoop: for (int arrIdx = 0; arrIdx < seedBrokers.length; arrIdx++) { - String seedBroker = seedBrokers[index]; - LOG.info("Trying to get topic metadata from broker {} in try {}/{}", seedBroker, retry, numRetries); - if (++index == seedBrokers.length) { - index = 0; - } - - URL brokerUrl = NetUtils.getCorrectHostnamePort(seedBroker); - SimpleConsumer consumer = null; - try { - consumer = new SimpleConsumer(brokerUrl.getHost(), brokerUrl.getPort(), soTimeout, bufferSize, clientId); - - TopicMetadataRequest req = new TopicMetadataRequest(topics); - kafka.javaapi.TopicMetadataResponse resp = consumer.send(req); - - List<TopicMetadata> metaData = resp.topicsMetadata(); - - // clear in case we have an incomplete list from previous tries - partitions.clear(); - for (TopicMetadata item : metaData) { - if (item.errorCode() != ErrorMapping.NoError()) { - // warn and try more brokers - LOG.warn("Error while getting metadata from broker " + seedBroker + " to find partitions " + - "for " + topics.toString() + ". Error: " + ErrorMapping.exceptionFor(item.errorCode()).getMessage()); - continue brokersLoop; - } - if (!topics.contains(item.topic())) { - LOG.warn("Received metadata from topic " + item.topic() + " even though it was not requested. Skipping ..."); - continue brokersLoop; - } - for (PartitionMetadata part : item.partitionsMetadata()) { - Node leader = brokerToNode(part.leader()); - KafkaTopicPartition ktp = new KafkaTopicPartition(item.topic(), part.partitionId()); - KafkaTopicPartitionLeader pInfo = new KafkaTopicPartitionLeader(ktp, leader); - partitions.add(pInfo); - } - } - break retryLoop; // leave the loop through the brokers - } catch (Exception e) { - //validates seed brokers in case of a ClosedChannelException - validateSeedBrokers(seedBrokers, e); - LOG.warn("Error communicating with broker " + seedBroker + " to find partitions for " + topics.toString() + "." + - "" + e.getClass() + ". Message: " + e.getMessage()); - LOG.debug("Detailed trace", e); - // we sleep a bit. Retrying immediately doesn't make sense in cases where Kafka is reorganizing the leader metadata - try { - Thread.sleep(500); - } catch (InterruptedException e1) { - // sleep shorter. - } - } finally { - if (consumer != null) { - consumer.close(); - } - } - } // brokers loop - } // retries loop - return partitions; - } - - /** - * Turn a broker instance into a node instance - * @param broker broker instance - * @return Node representing the given broker - */ - private static Node brokerToNode(Broker broker) { - return new Node(broker.id(), broker.host(), broker.port()); - } - - /** - * Validate the ZK configuration, checking for required parameters - * @param props Properties to check - */ - protected static void validateZooKeeperConfig(Properties props) { - if (props.getProperty("zookeeper.connect") == null) { - throw new IllegalArgumentException("Required property 'zookeeper.connect' has not been set in the properties"); - } - if (props.getProperty(ConsumerConfig.GROUP_ID_CONFIG) == null) { - throw new IllegalArgumentException("Required property '" + ConsumerConfig.GROUP_ID_CONFIG - + "' has not been set in the properties"); - } - - try { - //noinspection ResultOfMethodCallIgnored - Integer.parseInt(props.getProperty("zookeeper.session.timeout.ms", "0")); - } - catch (NumberFormatException e) { - throw new IllegalArgumentException("Property 'zookeeper.session.timeout.ms' is not a valid integer"); - } - - try { - //noinspection ResultOfMethodCallIgnored - Integer.parseInt(props.getProperty("zookeeper.connection.timeout.ms", "0")); - } - catch (NumberFormatException e) { - throw new IllegalArgumentException("Property 'zookeeper.connection.timeout.ms' is not a valid integer"); - } - } - - /** - * Validate that at least one seed broker is valid in case of a - * ClosedChannelException. - * - * @param seedBrokers - * array containing the seed brokers e.g. ["host1:port1", - * "host2:port2"] - * @param exception - * instance - */ - private static void validateSeedBrokers(String[] seedBrokers, Exception exception) { - if (!(exception instanceof ClosedChannelException)) { - return; - } - int unknownHosts = 0; - for (String broker : seedBrokers) { - URL brokerUrl = NetUtils.getCorrectHostnamePort(broker.trim()); - try { - InetAddress.getByName(brokerUrl.getHost()); - } catch (UnknownHostException e) { - unknownHosts++; - } - } - // throw meaningful exception if all the provided hosts are invalid - if (unknownHosts == seedBrokers.length) { - throw new IllegalArgumentException("All the servers provided in: '" - + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + "' config are invalid. (unknown hosts)"); - } - } - - private static long getInvalidOffsetBehavior(Properties config) { - final String val = config.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "largest"); - if (val.equals("none")) { - throw new IllegalArgumentException("Cannot use '" + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG - + "' value 'none'. Possible values: 'latest', 'largest', or 'earliest'."); - } - else if (val.equals("largest") || val.equals("latest")) { // largest is kafka 0.8, latest is kafka 0.9 - return OffsetRequest.LatestTime(); - } else { - return OffsetRequest.EarliestTime(); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java deleted file mode 100644 index 56ccd0b..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.kafka; - -import org.apache.flink.streaming.util.serialization.DeserializationSchema; - -import java.util.Properties; - -/** - * THIS CLASS IS DEPRECATED. Use FlinkKafkaConsumer08 instead. - */ -@Deprecated -public class FlinkKafkaConsumer081<T> extends FlinkKafkaConsumer08<T> { - - private static final long serialVersionUID = -5649906773771949146L; - - /** - * THIS CONSTRUCTOR IS DEPRECATED. Use FlinkKafkaConsumer08 instead. - */ - @Deprecated - public FlinkKafkaConsumer081(String topic, DeserializationSchema<T> valueDeserializer, Properties props) { - super(topic, valueDeserializer, props); - } -} - http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java deleted file mode 100644 index 0520336..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.kafka; - -import org.apache.flink.streaming.util.serialization.DeserializationSchema; - -import java.util.Properties; - -/** - * THIS CLASS IS DEPRECATED. Use FlinkKafkaConsumer08 instead. - */ -@Deprecated -public class FlinkKafkaConsumer082<T> extends FlinkKafkaConsumer08<T> { - - private static final long serialVersionUID = -5649906773771949146L; - - /** - * THIS CONSTRUCTOR IS DEPRECATED. Use FlinkKafkaConsumer08 instead. - */ - @Deprecated - public FlinkKafkaConsumer082(String topic, DeserializationSchema<T> valueDeserializer, Properties props) { - super(topic, valueDeserializer, props); - } -} - http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java deleted file mode 100644 index 1c2e0b7..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka; - -import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; -import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; -import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; -import org.apache.flink.streaming.util.serialization.SerializationSchema; -import java.util.Properties; - - -/** - * THIS CLASS IS DEPRECATED. Use FlinkKafkaProducer08 instead. - */ -@Deprecated -public class FlinkKafkaProducer<IN> extends FlinkKafkaProducer08<IN> { - - @Deprecated - public FlinkKafkaProducer(String brokerList, String topicId, SerializationSchema<IN> serializationSchema) { - super(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), null); - } - - @Deprecated - public FlinkKafkaProducer(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig) { - super(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, null); - } - - @Deprecated - public FlinkKafkaProducer(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner customPartitioner) { - super(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner); - - } - - @Deprecated - public FlinkKafkaProducer(String brokerList, String topicId, KeyedSerializationSchema<IN> serializationSchema) { - super(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), null); - } - - @Deprecated - public FlinkKafkaProducer(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig) { - super(topicId, serializationSchema, producerConfig, null); - } - - @Deprecated - public FlinkKafkaProducer(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner customPartitioner) { - super(topicId, serializationSchema, producerConfig, customPartitioner); - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java deleted file mode 100644 index 65de5fc..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java +++ /dev/null @@ -1,145 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka; - -import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner; -import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; -import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; -import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; -import org.apache.flink.streaming.util.serialization.SerializationSchema; - -import java.util.Properties; - - -/** - * Flink Sink to produce data into a Kafka topic. This producer is compatible with Kafka 0.8. - * - * Please note that this producer does not have any reliability guarantees. - * - * @param <IN> Type of the messages to write into Kafka. - */ -public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN> { - - private static final long serialVersionUID = 1L; - - // ------------------- Keyless serialization schema constructors ---------------------- - /** - * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to - * the topic. - * - * @param brokerList - * Comma separated addresses of the brokers - * @param topicId - * ID of the Kafka topic. - * @param serializationSchema - * User defined (keyless) serialization schema. - */ - public FlinkKafkaProducer08(String brokerList, String topicId, SerializationSchema<IN> serializationSchema) { - this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), new FixedPartitioner<IN>()); - } - - /** - * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to - * the topic. - * - * @param topicId - * ID of the Kafka topic. - * @param serializationSchema - * User defined (keyless) serialization schema. - * @param producerConfig - * Properties with the producer configuration. - */ - public FlinkKafkaProducer08(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig) { - this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FixedPartitioner<IN>()); - } - - /** - * The main constructor for creating a FlinkKafkaProducer. - * - * @param topicId The topic to write data to - * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[] - * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. - * @param customPartitioner A serializable partitioner for assining messages to Kafka partitions. - */ - public FlinkKafkaProducer08(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) { - this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner); - - } - - // ------------------- Key/Value serialization schema constructors ---------------------- - - /** - * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to - * the topic. - * - * @param brokerList - * Comma separated addresses of the brokers - * @param topicId - * ID of the Kafka topic. - * @param serializationSchema - * User defined serialization schema supporting key/value messages - */ - public FlinkKafkaProducer08(String brokerList, String topicId, KeyedSerializationSchema<IN> serializationSchema) { - this(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), new FixedPartitioner<IN>()); - } - - /** - * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to - * the topic. - * - * @param topicId - * ID of the Kafka topic. - * @param serializationSchema - * User defined serialization schema supporting key/value messages - * @param producerConfig - * Properties with the producer configuration. - */ - public FlinkKafkaProducer08(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig) { - this(topicId, serializationSchema, producerConfig, new FixedPartitioner<IN>()); - } - - /** - * The main constructor for creating a FlinkKafkaProducer. - * - * @param topicId The topic to write data to - * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages - * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. - * @param customPartitioner A serializable partitioner for assining messages to Kafka partitions. - */ - public FlinkKafkaProducer08(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) { - super(topicId, serializationSchema, producerConfig, customPartitioner); - } - - @Override - protected void flush() { - // The Kafka 0.8 producer doesn't support flushing, we wait here - // until all pending records are confirmed - synchronized (pendingRecordsLock) { - while (pendingRecords > 0) { - try { - pendingRecordsLock.wait(); - } catch (InterruptedException e) { - // this can be interrupted when the Task has been cancelled. - // by throwing an exception, we ensure that this checkpoint doesn't get confirmed - throw new RuntimeException("Flushing got interrupted while checkpointing", e); - } - } - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java deleted file mode 100644 index b155576..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.kafka; - -import org.apache.flink.api.table.Row; -import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; -import org.apache.flink.streaming.util.serialization.SerializationSchema; - -import java.util.Properties; - -/** - * Kafka 0.8 {@link KafkaTableSink} that serializes data in JSON format. - */ -public class Kafka08JsonTableSink extends KafkaJsonTableSink { - - /** - * Creates {@link KafkaTableSink} for Kafka 0.8 - * - * @param topic topic in Kafka to which table is written - * @param properties properties to connect to Kafka - * @param partitioner Kafka partitioner - */ - public Kafka08JsonTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner) { - super(topic, properties, partitioner); - } - - @Override - protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties, SerializationSchema<Row> serializationSchema, KafkaPartitioner<Row> partitioner) { - return new FlinkKafkaProducer08<>(topic, serializationSchema, properties, partitioner); - } - - @Override - protected Kafka08JsonTableSink createCopy() { - return new Kafka08JsonTableSink(topic, properties, partitioner); - } -} - http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java deleted file mode 100644 index 63bb57e..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka; - -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.table.Row; -import org.apache.flink.api.table.sources.StreamTableSource; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; - -import java.util.Properties; - -/** - * Kafka {@link StreamTableSource} for Kafka 0.8. - */ -public class Kafka08JsonTableSource extends KafkaJsonTableSource { - - /** - * Creates a Kafka 0.8 JSON {@link StreamTableSource}. - * - * @param topic Kafka topic to consume. - * @param properties Properties for the Kafka consumer. - * @param fieldNames Row field names. - * @param fieldTypes Row field types. - */ - public Kafka08JsonTableSource( - String topic, - Properties properties, - String[] fieldNames, - TypeInformation<?>[] fieldTypes) { - - super(topic, properties, fieldNames, fieldTypes); - } - - /** - * Creates a Kafka 0.8 JSON {@link StreamTableSource}. - * - * @param topic Kafka topic to consume. - * @param properties Properties for the Kafka consumer. - * @param fieldNames Row field names. - * @param fieldTypes Row field types. - */ - public Kafka08JsonTableSource( - String topic, - Properties properties, - String[] fieldNames, - Class<?>[] fieldTypes) { - - super(topic, properties, fieldNames, fieldTypes); - } - - @Override - FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) { - return new FlinkKafkaConsumer08<>(topic, deserializationSchema, properties); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java deleted file mode 100644 index 8f51237..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka; - -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.table.Row; -import org.apache.flink.api.table.sources.StreamTableSource; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; - -import java.util.Properties; - -/** - * Kafka {@link StreamTableSource} for Kafka 0.8. - */ -public class Kafka08TableSource extends KafkaTableSource { - - /** - * Creates a Kafka 0.8 {@link StreamTableSource}. - * - * @param topic Kafka topic to consume. - * @param properties Properties for the Kafka consumer. - * @param deserializationSchema Deserialization schema to use for Kafka records. - * @param fieldNames Row field names. - * @param fieldTypes Row field types. - */ - public Kafka08TableSource( - String topic, - Properties properties, - DeserializationSchema<Row> deserializationSchema, - String[] fieldNames, - TypeInformation<?>[] fieldTypes) { - - super(topic, properties, deserializationSchema, fieldNames, fieldTypes); - } - - /** - * Creates a Kafka 0.8 {@link StreamTableSource}. - * - * @param topic Kafka topic to consume. - * @param properties Properties for the Kafka consumer. - * @param deserializationSchema Deserialization schema to use for Kafka records. - * @param fieldNames Row field names. - * @param fieldTypes Row field types. - */ - public Kafka08TableSource( - String topic, - Properties properties, - DeserializationSchema<Row> deserializationSchema, - String[] fieldNames, - Class<?>[] fieldTypes) { - - super(topic, properties, deserializationSchema, fieldNames, fieldTypes); - } - - @Override - FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) { - return new FlinkKafkaConsumer08<>(topic, deserializationSchema, properties); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java deleted file mode 100644 index 23ff276..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java +++ /dev/null @@ -1,507 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka.internals; - -import java.util.ArrayDeque; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; - -import static java.util.Objects.requireNonNull; - -/** - * A special form of blocking queue with two additions: - * <ol> - * <li>The queue can be closed atomically when empty. Adding elements after the queue - * is closed fails. This allows queue consumers to atomically discover that no elements - * are available and mark themselves as shut down.</li> - * <li>The queue allows to poll batches of elements in one polling call.</li> - * </ol> - * - * The queue has no capacity restriction and is safe for multiple producers and consumers. - * - * <p>Note: Null elements are prohibited. - * - * @param <E> The type of elements in the queue. - */ -public class ClosableBlockingQueue<E> { - - /** The lock used to make queue accesses and open checks atomic */ - private final ReentrantLock lock; - - /** The condition on which blocking get-calls wait if the queue is empty */ - private final Condition nonEmpty; - - /** The deque of elements */ - private final ArrayDeque<E> elements; - - /** Flag marking the status of the queue */ - private volatile boolean open; - - // ------------------------------------------------------------------------ - - /** - * Creates a new empty queue. - */ - public ClosableBlockingQueue() { - this(10); - } - - /** - * Creates a new empty queue, reserving space for at least the specified number - * of elements. The queu can still grow, of more elements are added than the - * reserved space. - * - * @param initialSize The number of elements to reserve space for. - */ - public ClosableBlockingQueue(int initialSize) { - this.lock = new ReentrantLock(true); - this.nonEmpty = this.lock.newCondition(); - - this.elements = new ArrayDeque<>(initialSize); - this.open = true; - - - } - - /** - * Creates a new queue that contains the given elements. - * - * @param initialElements The elements to initially add to the queue. - */ - public ClosableBlockingQueue(Collection<? extends E> initialElements) { - this(initialElements.size()); - this.elements.addAll(initialElements); - } - - // ------------------------------------------------------------------------ - // Size and status - // ------------------------------------------------------------------------ - - /** - * Gets the number of elements currently in the queue. - * @return The number of elements currently in the queue. - */ - public int size() { - lock.lock(); - try { - return elements.size(); - } finally { - lock.unlock(); - } - } - - /** - * Checks whether the queue is empty (has no elements). - * @return True, if the queue is empty; false, if it is non-empty. - */ - public boolean isEmpty() { - return size() == 0; - } - - /** - * Checks whether the queue is currently open, meaning elements can be added and polled. - * @return True, if the queue is open; false, if it is closed. - */ - public boolean isOpen() { - return open; - } - - /** - * Tries to close the queue. Closing the queue only succeeds when no elements are - * in the queue when this method is called. Checking whether the queue is empty, and - * marking the queue as closed is one atomic operation. - * - * @return True, if the queue is closed, false if the queue remains open. - */ - public boolean close() { - lock.lock(); - try { - if (open) { - if (elements.isEmpty()) { - open = false; - nonEmpty.signalAll(); - return true; - } else { - return false; - } - } - else { - // already closed - return true; - } - } finally { - lock.unlock(); - } - } - - // ------------------------------------------------------------------------ - // Adding / Removing elements - // ------------------------------------------------------------------------ - - /** - * Tries to add an element to the queue, if the queue is still open. Checking whether the queue - * is open and adding the element is one atomic operation. - * - * <p>Unlike the {@link #add(Object)} method, this method never throws an exception, - * but only indicates via the return code if the element was added or the - * queue was closed. - * - * @param element The element to add. - * @return True, if the element was added, false if the queue was closes. - */ - public boolean addIfOpen(E element) { - requireNonNull(element); - - lock.lock(); - try { - if (open) { - elements.addLast(element); - if (elements.size() == 1) { - nonEmpty.signalAll(); - } - } - return open; - } finally { - lock.unlock(); - } - } - - /** - * Adds the element to the queue, or fails with an exception, if the queue is closed. - * Checking whether the queue is open and adding the element is one atomic operation. - * - * @param element The element to add. - * @throws IllegalStateException Thrown, if the queue is closed. - */ - public void add(E element) throws IllegalStateException { - requireNonNull(element); - - lock.lock(); - try { - if (open) { - elements.addLast(element); - if (elements.size() == 1) { - nonEmpty.signalAll(); - } - } else { - throw new IllegalStateException("queue is closed"); - } - } finally { - lock.unlock(); - } - } - - /** - * Returns the queue's next element without removing it, if the queue is non-empty. - * Otherwise, returns null. - * - * <p>The method throws an {@code IllegalStateException} if the queue is closed. - * Checking whether the queue is open and getting the next element is one atomic operation. - * - * <p>This method never blocks. - * - * @return The queue's next element, or null, if the queue is empty. - * @throws IllegalStateException Thrown, if the queue is closed. - */ - public E peek() { - lock.lock(); - try { - if (open) { - if (elements.size() > 0) { - return elements.getFirst(); - } else { - return null; - } - } else { - throw new IllegalStateException("queue is closed"); - } - } finally { - lock.unlock(); - } - } - - /** - * Returns the queue's next element and removes it, the queue is non-empty. - * Otherwise, this method returns null. - * - * <p>The method throws an {@code IllegalStateException} if the queue is closed. - * Checking whether the queue is open and removing the next element is one atomic operation. - * - * <p>This method never blocks. - * - * @return The queue's next element, or null, if the queue is empty. - * @throws IllegalStateException Thrown, if the queue is closed. - */ - public E poll() { - lock.lock(); - try { - if (open) { - if (elements.size() > 0) { - return elements.removeFirst(); - } else { - return null; - } - } else { - throw new IllegalStateException("queue is closed"); - } - } finally { - lock.unlock(); - } - } - - /** - * Returns all of the queue's current elements in a list, if the queue is non-empty. - * Otherwise, this method returns null. - * - * <p>The method throws an {@code IllegalStateException} if the queue is closed. - * Checking whether the queue is open and removing the elements is one atomic operation. - * - * <p>This method never blocks. - * - * @return All of the queue's elements, or null, if the queue is empty. - * @throws IllegalStateException Thrown, if the queue is closed. - */ - public List<E> pollBatch() { - lock.lock(); - try { - if (open) { - if (elements.size() > 0) { - ArrayList<E> result = new ArrayList<>(elements); - elements.clear(); - return result; - } else { - return null; - } - } else { - throw new IllegalStateException("queue is closed"); - } - } finally { - lock.unlock(); - } - } - - /** - * Returns the next element in the queue. If the queue is empty, this method - * waits until at least one element is added. - * - * <p>The method throws an {@code IllegalStateException} if the queue is closed. - * Checking whether the queue is open and removing the next element is one atomic operation. - * - * @return The next element in the queue, never null. - * - * @throws IllegalStateException Thrown, if the queue is closed. - * @throws InterruptedException Throw, if the thread is interrupted while waiting for an - * element to be added. - */ - public E getElementBlocking() throws InterruptedException { - lock.lock(); - try { - while (open && elements.isEmpty()) { - nonEmpty.await(); - } - - if (open) { - return elements.removeFirst(); - } else { - throw new IllegalStateException("queue is closed"); - } - } finally { - lock.unlock(); - } - } - - /** - * Returns the next element in the queue. If the queue is empty, this method - * waits at most a certain time until an element becomes available. If no element - * is available after that time, the method returns null. - * - * <p>The method throws an {@code IllegalStateException} if the queue is closed. - * Checking whether the queue is open and removing the next element is one atomic operation. - * - * @param timeoutMillis The number of milliseconds to block, at most. - * @return The next element in the queue, or null, if the timeout expires before an element is available. - * - * @throws IllegalStateException Thrown, if the queue is closed. - * @throws InterruptedException Throw, if the thread is interrupted while waiting for an - * element to be added. - */ - public E getElementBlocking(long timeoutMillis) throws InterruptedException { - if (timeoutMillis == 0L) { - // wait forever case - return getElementBlocking(); - } else if (timeoutMillis < 0L) { - throw new IllegalArgumentException("invalid timeout"); - } - - final long deadline = System.currentTimeMillis() + timeoutMillis; - - lock.lock(); - try { - while (open && elements.isEmpty() && timeoutMillis > 0) { - nonEmpty.await(timeoutMillis, TimeUnit.MILLISECONDS); - timeoutMillis = deadline - System.currentTimeMillis(); - } - - if (!open) { - throw new IllegalStateException("queue is closed"); - } - else if (elements.isEmpty()) { - return null; - } else { - return elements.removeFirst(); - } - } finally { - lock.unlock(); - } - } - - /** - * Gets all the elements found in the list, or blocks until at least one element - * was added. If the queue is empty when this method is called, it blocks until - * at least one element is added. - * - * <p>This method always returns a list with at least one element. - * - * <p>The method throws an {@code IllegalStateException} if the queue is closed. - * Checking whether the queue is open and removing the next element is one atomic operation. - * - * @return A list with all elements in the queue, always at least one element. - * - * @throws IllegalStateException Thrown, if the queue is closed. - * @throws InterruptedException Throw, if the thread is interrupted while waiting for an - * element to be added. - */ - public List<E> getBatchBlocking() throws InterruptedException { - lock.lock(); - try { - while (open && elements.isEmpty()) { - nonEmpty.await(); - } - if (open) { - ArrayList<E> result = new ArrayList<>(elements); - elements.clear(); - return result; - } else { - throw new IllegalStateException("queue is closed"); - } - } finally { - lock.unlock(); - } - } - - /** - * Gets all the elements found in the list, or blocks until at least one element - * was added. This method is similar as {@link #getBatchBlocking()}, but takes - * a number of milliseconds that the method will maximally wait before returning. - * - * <p>This method never returns null, but an empty list, if the queue is empty when - * the method is called and the request times out before an element was added. - * - * <p>The method throws an {@code IllegalStateException} if the queue is closed. - * Checking whether the queue is open and removing the next element is one atomic operation. - * - * @param timeoutMillis The number of milliseconds to wait, at most. - * @return A list with all elements in the queue, possible an empty list. - * - * @throws IllegalStateException Thrown, if the queue is closed. - * @throws InterruptedException Throw, if the thread is interrupted while waiting for an - * element to be added. - */ - public List<E> getBatchBlocking(long timeoutMillis) throws InterruptedException { - if (timeoutMillis == 0L) { - // wait forever case - return getBatchBlocking(); - } else if (timeoutMillis < 0L) { - throw new IllegalArgumentException("invalid timeout"); - } - - final long deadline = System.currentTimeMillis() + timeoutMillis; - - lock.lock(); - try { - while (open && elements.isEmpty() && timeoutMillis > 0) { - nonEmpty.await(timeoutMillis, TimeUnit.MILLISECONDS); - timeoutMillis = deadline - System.currentTimeMillis(); - } - - if (!open) { - throw new IllegalStateException("queue is closed"); - } - else if (elements.isEmpty()) { - return Collections.emptyList(); - } - else { - ArrayList<E> result = new ArrayList<>(elements); - elements.clear(); - return result; - } - } finally { - lock.unlock(); - } - } - - // ------------------------------------------------------------------------ - // Standard Utilities - // ------------------------------------------------------------------------ - - @Override - public int hashCode() { - int hashCode = 17; - for (E element : elements) { - hashCode = 31 * hashCode + element.hashCode(); - } - return hashCode; - } - - @Override - public boolean equals(Object obj) { - if (obj == this) { - return true; - } else if (obj != null && obj.getClass() == ClosableBlockingQueue.class) { - @SuppressWarnings("unchecked") - ClosableBlockingQueue<E> that = (ClosableBlockingQueue<E>) obj; - - if (this.elements.size() == that.elements.size()) { - Iterator<E> thisElements = this.elements.iterator(); - for (E thatNext : that.elements) { - E thisNext = thisElements.next(); - if (!(thisNext == null ? thatNext == null : thisNext.equals(thatNext))) { - return false; - } - } - return true; - } else { - return false; - } - } else { - return false; - } - } - - @Override - public String toString() { - return elements.toString(); - } -}