Ingesting & searching data in real time is one of the most important needs companies have nowadays. If we are talking IoT or we want to collect data from a huge quantity of sources, this is where this post may come in handy. We searched a simple way of achieving the ingestion & storage of JSON events.
The idea behind this series is to show how to ingest JSON data in a Hadoop Cluster, store them in HDFS and search them in Real Time using Solr or Hive.
Apache Flume is a distributed system for collecting, aggregating and moving large amounts of data from different sources to a centralized data store. We will use flume in HA to ingest a huge amount of JSON events in HDFS. We will not dwell into the general details of Flume.
Apache Flume httpSource gives us a good endpoint for data ingestion, as the client will only have to post a JSON event to the specified port and flume will receive this events.
agent.sources.httpSource.type = http
agent.sources.httpSource.port = 5555
agent.sources.httpSource.handler = org.apache.flume.sink.solr.morphline.BlobHandler
The first choice that we are presented with is choosing between BlobHandler vs JSONHandler. In this case we choose BlobHandler because it is the most simple, we get whatever gets sent. With the JSON Handler you must receive a JSON with a specific structure of headers and a body (!!a string body!!), so we’d rather give our sources the freedom to send whatever they want.
With this configuration we are then able to receive any type of JSON event in flume, now we need to store it.
HDFS sink is the obvious choice to store this data in a Hadoop Cluster. After testing a bit, this is the configuration we came up with:
agent.sinks.hdfsSink.type = hdfs
agent.sinks.hdfsSink.hdfs.path = /events/%Y/%m/%d
agent.sinks.hdfsSink.hdfs.useLocalTimeStamp = true
agent.sinks.hdfsSink.hdfs.batchSize = 1000
agent.sinks.hdfsSink.hdfs.rollSize = 524288000
agent.sinks.hdfsSink.hdfs.rollCount = 0
- Location: path and useLocalTimeStamp we can store the data in the day path so we have all the events in one day to treat them later. useLocalTimeStamp prevents us from asking the sources to send a timestamp.
- Size: we control the size file closing it every 300 seconds (rollInterval) or in 50MB (rollSize), whatever comes first. These values are always a tradeoff between getting data as soon as possible into HDFS and getting files big enough to prevent the small files problem.
- Use: the inUsePrefix is better than the default .tmp option of flume. If you dont change it and try to launch a MapReduce Job in the dir being written, it will give you an error if the file closes in the middle of the tasks. With . prefix Hadoop will not consider the file.
- Format: the simpler, the better. Sequence File with JSON inside is the simpler and better way to handle this. JSON may span across multiple lines, so if you use text format you can’t use the end of line as record delimiter. When using Seq File we get every event separated from the rest.
So after this first part we know how to ingest and store JSON Events in HDFS, in the next chapter let’s discuss how to search this information.