[ 
https://issues.apache.org/jira/browse/GEARPUMP-122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15303673#comment-15303673
 ] 

ASF GitHub Bot commented on GEARPUMP-122:
-----------------------------------------

Github user huafengw commented on a diff in the pull request:

    https://github.com/apache/incubator-gearpump/pull/25#discussion_r64864833
  
    --- Diff: 
external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala
 ---
    @@ -0,0 +1,173 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.gearpump.streaming.kafka.lib.source
    +
    +import java.util.Properties
    +
    +import com.twitter.bijection.Injection
    +import kafka.common.TopicAndPartition
    +import org.apache.gearpump.streaming.kafka.KafkaSource
    +import 
org.apache.gearpump.streaming.kafka.lib.source.consumer.FetchThread.FetchThreadFactory
    +import org.apache.gearpump.streaming.kafka.lib.util.KafkaClient
    +import KafkaClient.KafkaClientFactory
    +import 
org.apache.gearpump.streaming.kafka.lib.source.consumer.{KafkaMessage, 
FetchThread}
    +import 
org.apache.gearpump.streaming.kafka.lib.source.grouper.PartitionGrouper
    +import org.apache.gearpump.streaming.kafka.lib.util.KafkaClient
    +import org.apache.gearpump.streaming.kafka.util.KafkaConfig
    +import 
org.apache.gearpump.streaming.kafka.util.KafkaConfig.KafkaConfigFactory
    +import org.apache.gearpump.streaming.task.TaskContext
    +import org.apache.gearpump.streaming.transaction.api._
    +import org.apache.gearpump.util.LogUtil
    +import org.apache.gearpump.{Message, TimeStamp}
    +import org.slf4j.Logger
    +
    +object AbstractKafkaSource {
    +  private val LOG: Logger = LogUtil.getLogger(classOf[KafkaSource])
    +}
    +
    +/**
    + * Contains implementation for Kafka source connectors, users should use
    + * [[org.apache.gearpump.streaming.kafka.KafkaSource]].
    + *
    + * This is a TimeReplayableSource which is able to replay messages given a 
start time.
    + * Each kafka message is tagged with a timestamp by
    + * [[org.apache.gearpump.streaming.transaction.api.MessageDecoder]] and 
the (timestamp, offset)
    + * mapping is stored to a 
[[org.apache.gearpump.streaming.transaction.api.CheckpointStore]].
    + * On recovery, we could retrieve the previously stored offset from the
    + * [[org.apache.gearpump.streaming.transaction.api.CheckpointStore]] by 
timestamp and start to read
    + * from there.
    + *
    + * kafka message is wrapped into gearpump [[org.apache.gearpump.Message]] 
and further filtered by a
    + * [[org.apache.gearpump.streaming.transaction.api.TimeStampFilter]]
    + * such that obsolete messages are dropped.
    + */
    +abstract class AbstractKafkaSource(
    +    topic: String,
    +    props: Properties,
    +    kafkaConfigFactory: KafkaConfigFactory,
    +    kafkaClientFactory: KafkaClientFactory,
    +    fetchThreadFactory: FetchThreadFactory)
    +  extends TimeReplayableSource {
    +  import 
org.apache.gearpump.streaming.kafka.lib.source.AbstractKafkaSource._
    +
    +  def this(topic: String, properties: Properties) = {
    +    this(topic, properties, new KafkaConfigFactory, KafkaClient.factory, 
FetchThread.factory)
    +  }
    +
    +  private lazy val config: KafkaConfig = 
kafkaConfigFactory.getKafkaConfig(props)
    +  private lazy val kafkaClient: KafkaClient = 
kafkaClientFactory.getKafkaClient(config)
    +  private lazy val fetchThread: FetchThread = 
fetchThreadFactory.getFetchThread(config, kafkaClient)
    +  private lazy val messageDecoder = config.getConfiguredInstance(
    +    KafkaConfig.MESSAGE_DECODER_CLASS_CONFIG, classOf[MessageDecoder])
    +  private lazy val timestampFilter = config.getConfiguredInstance(
    +    KafkaConfig.TIMESTAMP_FILTER_CLASS_CONFIG, classOf[TimeStampFilter])
    +
    +  private var startTime: Long = 0L
    +  private var checkpointStoreFactory: Option[CheckpointStoreFactory] = None
    +  private var checkpointStores: Map[TopicAndPartition, CheckpointStore] =
    +    Map.empty[TopicAndPartition, CheckpointStore]
    +
    +  override def checkpoint(checkpointStoreFactory: CheckpointStoreFactory): 
Unit = {
    +    this.checkpointStoreFactory = Some(checkpointStoreFactory)
    +  }
    +
    +  override def open(context: TaskContext, startTime: TimeStamp): Unit = {
    +    import context.{parallelism, taskId}
    +
    +    LOG.info("KafkaSource opened at start time {}", startTime)
    --- End diff --
    
    all right, I missed that part.


> KafkaSource Stuck
> -----------------
>
>                 Key: GEARPUMP-122
>                 URL: https://issues.apache.org/jira/browse/GEARPUMP-122
>             Project: Apache Gearpump
>          Issue Type: Bug
>          Components: kafka
>    Affects Versions: 0.8.0
>            Reporter: Qi Shu
>            Assignee: Manu Zhang
>             Fix For: 0.8.1
>
>         Attachments: 2.png, screenshot-1.png
>
>
> Kafka's version is 2.10-0.8.2.0 and deployed on a cluster of 3 machines. 
> Gearpump's version is 2.11-0.8.0 and deployed on local mode.
> The app running on gearpump is a java example Kafka2Kafka.
> The topic has millions of messages in It。
> After app started for a while, there were no messages received from kafka, 
> but kafka console can recevie messages from that topic.
> And if I use the kafka source without offset checkpoint, then messages begin 
> to flow.
> "screenshot-1.png" means app started for 7 minutes  and no messages received 
> from kafka source.
> "2.png" was a screen shot of JProfiler, I use JProfiler to watch the thread 
> of Gearpump, and it seems the kafka source stucked when doing offset 
> checkpoint.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to