Jovan Kilibarda created FLUME-2809:
--------------------------------------
Summary: sink.kite.DatasetSink MalformedInputException
Key: FLUME-2809
URL: https://issues.apache.org/jira/browse/FLUME-2809
Project: Flume
Issue Type: Bug
Components: Sinks+Sources
Affects Versions: v1.7.0
Environment: Ubuntu VM:
uname -a
Linux ub64-master 3.19.0-25-generic #26~14.04.1-Ubuntu SMP Fri Jul 24 21:16:20
UTC 2015 x86_64 x86_64 x86_64 GNU/Linux
Reporter: Jovan Kilibarda
Getting this error when trying to use org.apache.flume.sink.kite.DatasetSink
# apache-flume-1.7.0-SNAPSHOT-bin/logs/flume.log
08 Oct 2015 15:19:39,991 INFO [lifecycleSupervisor-1-0]
(org.apache.flume.instrumentation.MonitoredCounterGroup.start:96) - Component
type: SOURCE, name: spooldir-src started
08 Oct 2015 15:19:40,183 ERROR [pool-3-thread-1]
(org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run:256)
- FATAL: Spool Directory source spooldir-src: { spoolDir: /var/flume/spooldir/
}: Uncaught exception in SpoolDirectorySource thread. Restart or reconfigure
Flume to continue processing.
java.nio.charset.MalformedInputException: Input length = 1
at java.nio.charset.CoderResult.throwException(CoderResult.java:277)
at
org.apache.flume.serialization.ResettableFileInputStream.readChar(ResettableFileInputStream.java:282)
at
org.apache.flume.serialization.LineDeserializer.readLine(LineDeserializer.java:133)
at
org.apache.flume.serialization.LineDeserializer.readEvent(LineDeserializer.java:71)
at
org.apache.flume.serialization.LineDeserializer.readEvents(LineDeserializer.java:90)
at
org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:252)
at
org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:228)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
08 Oct 2015 15:19:40,550 INFO [SinkRunner-PollingRunner-DefaultSinkProcessor]
(org.kitesdk.data.spi.filesystem.FileSystemWriter.initialize:147) - Opened
output appender
AvroAppender{path=file:/home/bob/kite_demo/dns_data/.2d20b2e7-64f2-4d09-87da-aa12120518f7.avro.tmp,
schema="bytes", fileSystem=org.apache.hadoop.fs.LocalFileSystem@15e5a95d,
enableCompression=true,
dataFileWriter=org.apache.avro.file.DataFileWriter@18d223e6,
writer=org.apache.avro.reflect.ReflectDatumWriter@c32ddc5} for
file:/home/bob/kite_demo/dns_data/2d20b2e7-64f2-4d09-87da-aa12120518f7.avro
# Here is how I get into this problem...
# This is flume configuration file
# conf/flume.kite.conf
a5.sources = spooldir-src
a5.sinks = sink1
a5.channels = mem-ch1
a5.channels.mem-ch1.type = memory
a5.channels.mem-ch1.capacity = 10000
a5.channels.mem-ch1.transactionCapacity = 1000
a5.sources.spooldir-src.type = spooldir
a5.sources.spooldir-src.spoolDir = /var/flume/spooldir/
a5.sources.spooldir-src.deletePolicy = immediate
a5.sources.spooldir-src.channels = mem-ch1
a5.sources.spooldir-src.selector.type = replicating
a5.sources.spooldir-src.interceptors = i1
a5.sources.spooldir-src.interceptors.i1.type =
org.apache.flume.interceptor.ibInterceptor$Builder
a5.sources.spooldir-src.interceptors.i1.preserveExisting = false
a5.sources.spooldir-src.interceptors.i1.header = flume.avro.schema.literal
a5.sources.spooldir-src.interceptors.i1.schema = /var/schema/dns_data.avsc
a5.sinks.sink1.type = org.apache.flume.sink.kite.DatasetSink
a5.sinks.sink1.channel = mem-ch1
a5.sinks.sink1.kite.dataset.uri = dataset:file:/home/bob/kite_demo/dns_data
a5.sinks.sink1.kite.entityParser = avro
cat /var/schema/dns_data.avsc
"bytes"
# Start flume
apache-flume-1.7.0-SNAPSHOT-bin$ ./bin/flume-ng agent -c conf -f
conf/flume.kite.conf -n a5
# Receive a text file with one line only
# captured-dns.txt
19-Sep-2015 01:14:23.190 client 172.31.1.130#55282: UDP: query: a1.z1.com IN A
response: NOERROR +AEV a1.z1.com. 28800 IN A 1.2.3.4;
# copy it to /home/bob/tmp/captured-dns_filtered
# serialize it using generic schema
java -jar ~/avro/avro-src-1.7.7/lang/java/tools/target/avro-tools-1.7.7.jar
fromtext /home/bob/tmp/captured-dns_filtered
/home/bob/tmp/captured-dns_filtered.avro
# produced avro file looks ok; here is schema pulled out of it
java -jar ~/avro/avro-src-1.7.7/lang/java/tools/target/avro-tools-1.7.7.jar
getschema /var/flume/spooldir/captured-dns_filtered.avro
"bytes"
# and the data
java -jar ~/avro/avro-src-1.7.7/lang/java/tools/target/avro-tools-1.7.7.jar
totext /var/flume/spooldir/captured-dns_filtered.avro -
19-Sep-2015 01:14:23.190 client 172.31.1.130#55282: UDP: query: a1.z1.com IN A
response: NOERROR +AEV a1.z1.com. 28800 IN A 1.2.3.4;
# so, move avro file to flume spool folder
mv /home/bob/tmp/captured-dns_filtered.avro /var/flume/spooldir/
# and the above log happens.
# I added interceptor which seems to be configured correctly and I can see
debug message
# when I start flume
/**
* Only {@link ibInterceptor.Builder} can build me
*/
private ibInterceptor(boolean preserveExisting, boolean useIP,
String header, String schema)
{
this.preserveExisting = preserveExisting;
this.header = header;
this.schema = schema;
InetAddress addr;
System.out.println("\n-ibInterceptor header=" + header);
System.out.println("-ibInterceptor schema=" + schema);
try {
addr = InetAddress.getLocalHost();
if (useIP) {
host = addr.getHostAddress();
System.out.println("-ibInterceptor host=" + host);
} else {
host = addr.getCanonicalHostName();
}
} catch (UnknownHostException e) {
logger.warn("Could not get local host address. Exception follows.", e);
}
}
# the intercept() method is not called; the println message at the very
beginning never
# gets displayed
# though it should based on java docs. Using this same custom interceptor with
other kind
# of sinks, like file_roll works fine.
/**
* Modifies events in-place.
*/
@Override
public Event intercept(Event event)
{
System.out.println("--intercept()");
if (schema != null) {
String schema_def = "bytes";
headers.put(header, schema_def);
System.out.println("-intercept(): schema_def=" + schema_def);
}
return event;
}
# The only purpose of my interceptor is to insert schema into the flume event
header;
# without it, flume complains
08 Oct 2015 17:38:41,738 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor]
(org.apache.flume.sink.kite.policy.RetryPolicy.handle:39) - Event delivery
failed: No schema in event headers. Headers must include either
flume.avro.schema.url or flume.avro.schema.literal
08 Oct 2015 17:38:41,738 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor]
(org.apache.flume.SinkRunner$PollingRunner.run:160) - Unable to deliver event.
Exception follows.
# makes sense based on Kite Dataset Sink docs.
Am I doing something wrong?
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)