[ https://issues.apache.org/jira/browse/GEARPUMP-122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15303673#comment-15303673 ]
ASF GitHub Bot commented on GEARPUMP-122: ----------------------------------------- Github user huafengw commented on a diff in the pull request: https://github.com/apache/incubator-gearpump/pull/25#discussion_r64864833 --- Diff: external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala --- @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.kafka.lib.source + +import java.util.Properties + +import com.twitter.bijection.Injection +import kafka.common.TopicAndPartition +import org.apache.gearpump.streaming.kafka.KafkaSource +import org.apache.gearpump.streaming.kafka.lib.source.consumer.FetchThread.FetchThreadFactory +import org.apache.gearpump.streaming.kafka.lib.util.KafkaClient +import KafkaClient.KafkaClientFactory +import org.apache.gearpump.streaming.kafka.lib.source.consumer.{KafkaMessage, FetchThread} +import org.apache.gearpump.streaming.kafka.lib.source.grouper.PartitionGrouper +import org.apache.gearpump.streaming.kafka.lib.util.KafkaClient +import org.apache.gearpump.streaming.kafka.util.KafkaConfig +import org.apache.gearpump.streaming.kafka.util.KafkaConfig.KafkaConfigFactory +import org.apache.gearpump.streaming.task.TaskContext +import org.apache.gearpump.streaming.transaction.api._ +import org.apache.gearpump.util.LogUtil +import org.apache.gearpump.{Message, TimeStamp} +import org.slf4j.Logger + +object AbstractKafkaSource { + private val LOG: Logger = LogUtil.getLogger(classOf[KafkaSource]) +} + +/** + * Contains implementation for Kafka source connectors, users should use + * [[org.apache.gearpump.streaming.kafka.KafkaSource]]. + * + * This is a TimeReplayableSource which is able to replay messages given a start time. + * Each kafka message is tagged with a timestamp by + * [[org.apache.gearpump.streaming.transaction.api.MessageDecoder]] and the (timestamp, offset) + * mapping is stored to a [[org.apache.gearpump.streaming.transaction.api.CheckpointStore]]. + * On recovery, we could retrieve the previously stored offset from the + * [[org.apache.gearpump.streaming.transaction.api.CheckpointStore]] by timestamp and start to read + * from there. + * + * kafka message is wrapped into gearpump [[org.apache.gearpump.Message]] and further filtered by a + * [[org.apache.gearpump.streaming.transaction.api.TimeStampFilter]] + * such that obsolete messages are dropped. + */ +abstract class AbstractKafkaSource( + topic: String, + props: Properties, + kafkaConfigFactory: KafkaConfigFactory, + kafkaClientFactory: KafkaClientFactory, + fetchThreadFactory: FetchThreadFactory) + extends TimeReplayableSource { + import org.apache.gearpump.streaming.kafka.lib.source.AbstractKafkaSource._ + + def this(topic: String, properties: Properties) = { + this(topic, properties, new KafkaConfigFactory, KafkaClient.factory, FetchThread.factory) + } + + private lazy val config: KafkaConfig = kafkaConfigFactory.getKafkaConfig(props) + private lazy val kafkaClient: KafkaClient = kafkaClientFactory.getKafkaClient(config) + private lazy val fetchThread: FetchThread = fetchThreadFactory.getFetchThread(config, kafkaClient) + private lazy val messageDecoder = config.getConfiguredInstance( + KafkaConfig.MESSAGE_DECODER_CLASS_CONFIG, classOf[MessageDecoder]) + private lazy val timestampFilter = config.getConfiguredInstance( + KafkaConfig.TIMESTAMP_FILTER_CLASS_CONFIG, classOf[TimeStampFilter]) + + private var startTime: Long = 0L + private var checkpointStoreFactory: Option[CheckpointStoreFactory] = None + private var checkpointStores: Map[TopicAndPartition, CheckpointStore] = + Map.empty[TopicAndPartition, CheckpointStore] + + override def checkpoint(checkpointStoreFactory: CheckpointStoreFactory): Unit = { + this.checkpointStoreFactory = Some(checkpointStoreFactory) + } + + override def open(context: TaskContext, startTime: TimeStamp): Unit = { + import context.{parallelism, taskId} + + LOG.info("KafkaSource opened at start time {}", startTime) --- End diff -- all right, I missed that part. > KafkaSource Stuck > ----------------- > > Key: GEARPUMP-122 > URL: https://issues.apache.org/jira/browse/GEARPUMP-122 > Project: Apache Gearpump > Issue Type: Bug > Components: kafka > Affects Versions: 0.8.0 > Reporter: Qi Shu > Assignee: Manu Zhang > Fix For: 0.8.1 > > Attachments: 2.png, screenshot-1.png > > > Kafka's version is 2.10-0.8.2.0 and deployed on a cluster of 3 machines. > Gearpump's version is 2.11-0.8.0 and deployed on local mode. > The app running on gearpump is a java example Kafka2Kafka. > The topic has millions of messages in It。 > After app started for a while, there were no messages received from kafka, > but kafka console can recevie messages from that topic. > And if I use the kafka source without offset checkpoint, then messages begin > to flow. > "screenshot-1.png" means app started for 7 minutes and no messages received > from kafka source. > "2.png" was a screen shot of JProfiler, I use JProfiler to watch the thread > of Gearpump, and it seems the kafka source stucked when doing offset > checkpoint. -- This message was sent by Atlassian JIRA (v6.3.4#6332)