Hello : I am trying to transfer data from my local sql database table to phoenix on EC2 instance Do not see any error in logs and data is not inserted / updated into phoenix tables Any help would be really appreciated. $bin/flume-ng agent --conf conf --conf-file conf/stsflume-mysql-phoenix.conf --name agent1 -Dflume.root.logger=DEBUG,console
Below is the configuration agent1.channels.ph1.type = memory agent1.sources.sql-source.channels = ph1 agent1.channels = ph1 agent1.sinks = PHOENIX agent1.sources = sql-source agent1.sources.sql-source.type = org.keedio.flume.source.SQLSource # URL to connect to database (currently only mysql is supported) agent1.sources.sql-source.type = org.keedio.flume.source.SQLSource agent1.sources.sql-source.hibernate.connection.url = jdbc:mysql://127.0.0.1:3306/my_schema # Hibernate Database connection properties agent1.sources.sql-source.hibernate.connection.user = root agent1.sources.sql-source.hibernate.connection.password = root agent1.sources.sql-source.hibernate.connection.autocommit = true agent1.sources.sql-source.hibernate.dialect = org.hibernate.dialect.MySQLDialect agent1.sources.sql-source.hibernate.connection.driver_class = com.mysql.jdbc.Driver agent1.sources.sql-source.table = dwki_spelling_stage_summary # Columns to import to kafka (default * import entire row) agent1.sources.sql-source.columns.to.select = "profile_id","class_id","school_id","district_id","time_period_code","spelling_stage" # Increment column properties # Increment value is from you want to start taking data from tables (0 will import entire table) agent1.sources.sql-source.incremental.value = 0 # Query delay, each configured milisecond the query will be sent agent1.sources.sql-source.run.query.delay=100 # Status file is used to save last readed row agent1.sources.sql-source.status.file.path = /var/lib/flume agent1.sources.sql-source.status.file.name = sql-source.status agent1.sinks.PHOENIX.channel = ph1 agent1.sinks.PHOENIX.type = org.apache.phoenix.flume.sink.PhoenixSink agent1.sinks.PHOENIX.batchSize = 100 agent1.sinks.PHOENIX.zookeeperQuorum=ip-11-46-175-227.ec2.internal #agent1.sinks.PHOENIX.jdbcUrl=jdbc:phoenix:thin:url=http://ip-11-46-174-220.ec2.internal:8765;serialization=PROTOBUF;authentication=SPNEGO agent1.sinks.PHOENIX.jdbcUrl=jdbc:phoenix:thin:url=http://ip-11-46-174-220.ec2.internal:8765;serialization=PROTOBUF agent1.sinks.PHOENIX.table="dwki_spelling_stage_summary1" agent1.sinks.PHOENIX.serializer=regex agent1.sinks.PHOENIX.serializer.regex=(.*) agent1.sinks.PHOENIX.serializer.columns="profile_id","class_id","school_id","district_id","time_period_code","spelling_stage","creation_date","modified_date" I know it reading from the source because i can see sql-source.status updated every a record is inserted Is there anything I am missing In logs I keep seeing this 2017-10-19 00:43:49,564 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - org.apache.phoenix.flume.serializer.RegexEventSerializer.upsertEvents(RegexEventSerializer.java:90)] payload 1 size doesn't match the pattern 6 2017-10-19 00:43:49,564 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - org.apache.phoenix.flume.serializer.RegexEventSerializer.upsertEvents(RegexEventSerializer.java:90)] payload 1 size doesn't match the pattern 6 Below are table details 0: jdbc:phoenix:thin:url=http://localhost:876> !describe "dwki_spelling_stage_summary1"; +------------+--------------+-------------------------------+-------------------+------------+------------+--------------+----------------+-----------------+-----------------+-----------+--+ | TABLE_CAT | TABLE_SCHEM | TABLE_NAME | COLUMN_NAME | DATA_TYPE | TYPE_NAME | COLUMN_SIZE | BUFFER_LENGTH | DECIMAL_DIGITS | NUM_PREC_RADIX | NULLABLE | | +------------+--------------+-------------------------------+-------------------+------------+------------+--------------+----------------+-----------------+-----------------+-----------+--+ | | | dwki_spelling_stage_summary1 | profile_id | 12 | VARCHAR | null | null | null | null | 0 | | | | | dwki_spelling_stage_summary1 | class_id | 12 | VARCHAR | null | null | null | null | 1 | | | | | dwki_spelling_stage_summary1 | school_id | 12 | VARCHAR | null | null | null | null | 1 | | | | | dwki_spelling_stage_summary1 | district_id | 12 | VARCHAR | null | null | null | null | 1 | | | | | dwki_spelling_stage_summary1 | time_period_code | 12 | VARCHAR | null | null | null | null | 1 | | | | | dwki_spelling_stage_summary1 | spelling_stage | 12 | VARCHAR | null | null | null | null | 1 | | | | | dwki_spelling_stage_summary1 | creation_date | 12 | VARCHAR | null | null | null | null | 1 | | | | | dwki_spelling_stage_summary1 | modified_date | 12 | VARCHAR | null | null | null | null | 1 | | +------------+--------------+-------------------------------+-------------------+------------+------------+--------------+----------------+-----------------+-----------------+---- Manually adding records works fine 0: jdbc:phoenix:thin:url=http://localhost:876> UPSERT INTO "dwki_spelling_stage_summary1" ("profile_id", "class_id", "school_id", "district_id", "time_period_code", "spelling_stage") VALUES ('111', '111c', '111s', '111d', 'b', 'A'); 1 row affected (0.033 seconds) 0: jdbc:phoenix:thin:url=http://localhost:876> select * from "dwki_spelling_stage_summary1"; +-------------+-----------+------------+--------------+-------------------+-----------------+----------------+----------------+ | profile_id | class_id | school_id | district_id | time_period_code | spelling_stage | creation_date | modified_date | +-------------+-----------+------------+--------------+-------------------+-----------------+----------------+----------------+ | 111 | 111c | 111s | 111d | b | A | | | | 12 | | | | | | | | +-------------+-----------+------------+--------------+-------------------+-----------------+----------------+----------------+
