[
https://issues.apache.org/jira/browse/GOBBLIN-1325?focusedWorklogId=519271&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-519271
]
ASF GitHub Bot logged work on GOBBLIN-1325:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 02/Dec/20 22:01
Start Date: 02/Dec/20 22:01
Worklog Time Spent: 10m
Work Description: sv2000 commented on a change in pull request #3163:
URL: https://github.com/apache/incubator-gobblin/pull/3163#discussion_r534504406
##########
File path:
gobblin-modules/gobblin-kafka-11/src/main/java/org/apache/gobblin/kafka/serialize/LiAvroSerializer.java
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.kafka.serialize;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.serialization.Serializer;
+
+
+/**
+ * LinkedIn's implementation of Avro-schema based serialization for Kafka
+ * TODO: Implement this for IndexedRecord not just GenericRecord
+ *
+ */
+public class LiAvroSerializer extends LiAvroSerializerBase implements
Serializer<GenericRecord> {
Review comment:
Are these SerDe classes needed? These are LI-specific implementations
which we should deprecate in older versions and skip them in newer versions.
##########
File path:
gobblin-modules/gobblin-kafka-11/src/main/java/org/apache/gobblin/kafka/client/Kafka11ConsumerClient.java
##########
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.kafka.client;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Properties;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import javax.annotation.Nonnull;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import
org.apache.gobblin.source.extractor.extract.kafka.KafkaOffsetRetrievalFailureException;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaPartition;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaTopic;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * A {@link GobblinKafkaConsumerClient} that uses kafka 1.1 consumer client.
Use {@link Factory#create(Config)} to create
+ * new Kafka1.1ConsumerClients. The {@link Config} used to create clients must
have required key {@value #GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY}
+ *
+ * @param <K> Message key type
+ * @param <V> Message value type
+ */
+public class Kafka11ConsumerClient<K, V> extends
AbstractBaseKafkaConsumerClient {
Review comment:
Can we add implementations of subscribe() methods similar to what is
done for Kafka09ConsumerClient?
##########
File path:
gobblin-modules/gobblin-kafka-11/src/test/java/org/apache/gobblin/kafka/KafkaClusterTestBase.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.kafka;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.ZKStringSerializer$;
+import kafka.zk.EmbeddedZookeeper;
+
+import org.apache.gobblin.test.TestUtils;
+
+public class KafkaClusterTestBase extends KafkaTestBase {
+
+ int clusterCount;
+ EmbeddedZookeeper _zkServer;
+ String _zkConnectString;
+ ZkClient _zkClient;
+ List<KafkaServer> kafkaBrokerList = new ArrayList<KafkaServer>();
+ List<Integer> kafkaBrokerPortList = new ArrayList<Integer>();
+
+ public KafkaClusterTestBase(int clusterCount) throws InterruptedException,
RuntimeException {
+ super();
+ this.clusterCount = clusterCount;
+ }
+
+ public void startCluster() {
+ // Start Zookeeper.
+ _zkServer = new EmbeddedZookeeper();
+ _zkConnectString = "127.0.0.1:"+_zkServer.port();
Review comment:
Nit: whitespaces around "+".
##########
File path:
gobblin-modules/gobblin-kafka-11/src/main/java/org/apache/gobblin/kafka/client/Kafka11ConsumerClient.java
##########
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.kafka.client;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Properties;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import javax.annotation.Nonnull;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import
org.apache.gobblin.source.extractor.extract.kafka.KafkaOffsetRetrievalFailureException;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaPartition;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaTopic;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * A {@link GobblinKafkaConsumerClient} that uses kafka 1.1 consumer client.
Use {@link Factory#create(Config)} to create
+ * new Kafka1.1ConsumerClients. The {@link Config} used to create clients must
have required key {@value #GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY}
+ *
+ * @param <K> Message key type
+ * @param <V> Message value type
+ */
+public class Kafka11ConsumerClient<K, V> extends
AbstractBaseKafkaConsumerClient {
+
+ private static final String CLIENT_BOOTSTRAP_SERVERS_KEY =
"bootstrap.servers";
+ private static final String CLIENT_ENABLE_AUTO_COMMIT_KEY =
"enable.auto.commit";
+ private static final String CLIENT_SESSION_TIMEOUT_KEY =
"session.timeout.ms";
+ private static final String CLIENT_KEY_DESERIALIZER_CLASS_KEY =
"key.deserializer";
+ private static final String CLIENT_VALUE_DESERIALIZER_CLASS_KEY =
"value.deserializer";
+ private static final String CLIENT_GROUP_ID = "group.id";
+
+ private static final String DEFAULT_ENABLE_AUTO_COMMIT =
Boolean.toString(false);
+ private static final String DEFAULT_KEY_DESERIALIZER =
+ "org.apache.kafka.common.serialization.StringDeserializer";
+ private static final String DEFAULT_GROUP_ID = "kafka11";
+
+ public static final String GOBBLIN_CONFIG_KEY_DESERIALIZER_CLASS_KEY =
CONFIG_PREFIX
+ + CLIENT_KEY_DESERIALIZER_CLASS_KEY;
+ public static final String GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY =
CONFIG_PREFIX
+ + CLIENT_VALUE_DESERIALIZER_CLASS_KEY;
+
+ private static final Config FALLBACK =
+ ConfigFactory.parseMap(ImmutableMap.<String, Object>builder()
+ .put(CLIENT_ENABLE_AUTO_COMMIT_KEY,
DEFAULT_ENABLE_AUTO_COMMIT)
+ .put(CLIENT_KEY_DESERIALIZER_CLASS_KEY,
DEFAULT_KEY_DESERIALIZER)
+ .put(CLIENT_GROUP_ID, DEFAULT_GROUP_ID)
+ .build());
+
+ private final Consumer<K, V> consumer;
+
+ private Kafka11ConsumerClient(Config config) {
+ super(config);
+
Preconditions.checkArgument(config.hasPath(GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY),
+ "Missing required property " +
GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY);
+
+ Properties props = new Properties();
+ props.put(CLIENT_BOOTSTRAP_SERVERS_KEY,
Joiner.on(",").join(super.brokers));
+ props.put(CLIENT_SESSION_TIMEOUT_KEY, super.socketTimeoutMillis);
+
+ // grab all the config under "source.kafka" and add the defaults as
fallback.
+ Config baseConfig = ConfigUtils.getConfigOrEmpty(config,
CONFIG_NAMESPACE).withFallback(FALLBACK);
+ // get the "source.kafka.consumerConfig" config for extra config to
pass along to Kafka with a fallback to the
+ // shared config that start with "gobblin.kafka.sharedConfig"
+ Config specificConfig = ConfigUtils.getConfigOrEmpty(baseConfig,
CONSUMER_CONFIG).withFallback(
+ ConfigUtils.getConfigOrEmpty(config,
ConfigurationKeys.SHARED_KAFKA_CONFIG_PREFIX));
+ // The specific config overrides settings in the base config
+ Config scopedConfig =
specificConfig.withFallback(baseConfig.withoutPath(CONSUMER_CONFIG));
+ props.putAll(ConfigUtils.configToProperties(scopedConfig));
+
+ this.consumer = new KafkaConsumer<>(props);
+ }
+
+ public Kafka11ConsumerClient(Config config, Consumer<K, V> consumer) {
+ super(config);
+ this.consumer = consumer;
+ }
+
+ @Override
+ public List<KafkaTopic> getTopics() {
+ return FluentIterable.from(this.consumer.listTopics().entrySet())
+ .transform(new Function<Entry<String, List<PartitionInfo>>,
KafkaTopic>() {
+ @Override
+ public KafkaTopic apply(Entry<String, List<PartitionInfo>>
filteredTopicEntry) {
+ return new KafkaTopic(filteredTopicEntry.getKey(),
Lists.transform(filteredTopicEntry.getValue(),
+ PARTITION_INFO_TO_KAFKA_PARTITION));
+ }
+ }).toList();
+ }
+
+ @Override
+ public long getEarliestOffset(KafkaPartition partition) throws
KafkaOffsetRetrievalFailureException {
+ TopicPartition topicPartition = new
TopicPartition(partition.getTopicName(), partition.getId());
+ List<TopicPartition> topicPartitionList =
Collections.singletonList(topicPartition);
+ this.consumer.assign(topicPartitionList);
+ this.consumer.seekToBeginning(topicPartitionList);
+
+ return this.consumer.position(topicPartition);
+ }
+
+ @Override
+ public long getLatestOffset(KafkaPartition partition) throws
KafkaOffsetRetrievalFailureException {
+ TopicPartition topicPartition = new
TopicPartition(partition.getTopicName(), partition.getId());
+ List<TopicPartition> topicPartitionList =
Collections.singletonList(topicPartition);
+ this.consumer.assign(topicPartitionList);
+ this.consumer.seekToEnd(topicPartitionList);
+
+ return this.consumer.position(topicPartition);
+ }
+
+ @Override
+ public Iterator<KafkaConsumerRecord> consume(KafkaPartition partition,
long nextOffset, long maxOffset) {
+
+ if (nextOffset > maxOffset) {
+ return null;
+ }
+
+ this.consumer.assign(Lists.newArrayList(new
TopicPartition(partition.getTopicName(), partition.getId())));
+ this.consumer.seek(new TopicPartition(partition.getTopicName(),
partition.getId()), nextOffset);
+ ConsumerRecords<K, V> consumerRecords =
consumer.poll(super.fetchTimeoutMillis);
Review comment:
Add a null check here. Not sure if consumer.poll() is always guaranteed
to return a non-null value.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 519271)
Time Spent: 1h (was: 50m)
> Add Kafka 1.1 Module
> --------------------
>
> Key: GOBBLIN-1325
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1325
> Project: Apache Gobblin
> Issue Type: New Feature
> Components: gobblin-kafka
> Reporter: Hanghang Liu
> Assignee: Shirshanka Das
> Priority: Critical
> Time Spent: 1h
> Remaining Estimate: 0h
>
> Upgrade request got from the Verizon team: Kafka 1.1
--
This message was sent by Atlassian Jira
(v8.3.4#803005)