Today we are going to talk about Morphlines, an open source framework developed by Cloudera, that provides a new way to do ETL on Hadoop.
What are these morphlines?
Morphlines are simple configurations files that defines how to transform data on the fly. It consists on a file that describes the steps a data flow has to pass in order to get to the end. This steps define how to transform, create or drop any field in the data event received. There are a lot of predefined steps and we can also create our own and integrate them in the file in an easy way if we need to make something new.
Where to use it?
Morphlines are thought primarily to use in real time cases in combination with Flume, but also have an integration with MapReduce if we want to use it in batch mode (CrunchIndexerTool, MapReduceIndexerTool andHBaseMapReduceIndexerTool)
In this image we can see where in the Hadoop architecture morphlines have their place:
How to use it?
This is a sample of a morphline configuration file in which we read a event in JSON, parse, clean, log and index it in Solr.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
SOLR_LOCATOR : { # Name of solr collection collection : collection1 # ZooKeeper ensemble zkHost : "127.0.0.1:2181/solr" } morphlines : [ { id : morphline1 importCommands : ["org.kitesdk.**", "org.apache.solr.**"] commands : [ # Read the inputs as JSON and emit one record per JSON record { readJson {} } # Convert the Json record to a record with name, age and gender fields { extractJsonPaths { flatten: true paths: { name: /name age: /person/age gender: /person/gender } } # Special command to remove fields and prevent Solr Errors { sanitizeUnknownSolrFields { solrLocator : ${SOLR_LOCATOR} } } # Log the output record in debug mode { logDebug { format : "output record: {}", args : ["@{}"] } } # Load in solr collection { loadSolr { solrLocator : ${SOLR_LOCATOR} } ] } ] |
As we see in the code all the stages are chained and doing their part til the loadSolr command index it in solr.
This will make an excellent example for ingestion in real time as a solr sink as you can see in the series of posts we’ve published Ingest & Search JSON events in Real Time. But we could also use it with MapReduceIndexerTool or CrunchIndexer.
Interesting Morphlines Commands
In the Morphlines Reference Guide you could see all the available commads but we are going to highlight some of them that are very interesting.
- readCSV or readJSON: they can read these files and emit records.
1 2 3 4 5 6 7 |
# read the CSV records and emit one record per line with fields Name,Age and Gender readCSV { separator : "," columns : [Name,Age,Gender] ignoreFirstLine : false charset : UTF-8 } |
- addValues, addValuesIfAbsent, removeFields : you can add or remove fields from the records.
1 2 3 4 5 6 7 8 9 |
# Put auxname value in name if it doesn't exist addValuesIfAbsent { name: "@{auxname}" } # Remove field auxname from records removeFields { blacklist : ["auxname"] } |
- addCurrenttime, convertTimestamp: Time functions so we can add a timestamp to the records.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
# Create event_datetime field in input record and put current time in it. { addCurrentTime {field : event_datetime} } # Convert the date in event_datetime field to "yyyy-MM-dd" and timezone Madrid { convertTimestamp { field : event_datetime inputFormats : ["unixTimeInMillis"] outputFormat : "yyyy-MM-dd" outputTimezone : Europe/Madrid } } |
- contains, equals, if, not: We can use conditionals to decide what to do with a record.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
# Drop record (don't propagate it) if area is not in Sports or Health { if { conditions : [ { not {contains { area : [Sports, Health] } } } ] then : [ { dropRecord {} } ] } } |
Custom Commands
One of the most interesting property of Morphlines framework is the capacity of writing your own commands as a step or include a java block in your morphline. You can see in the ReadLine code of Cloudera how easy is to write a new step.
This is an example of a java block included in the morphline file:
1 2 3 4 5 6 7 8 9 10 11 |
# Delete record if it has no name or more than one java { imports : "import java.util.*;" code: """ List names = record.get("name"); if (names.size!=1) { return false; } return child.process(record); """ } |
Take away
So we can see with these examples how powerful Cloudera ETL approach “morphlines” is. Hortonworks has gone all the other way with Nifi but that is for another blog post.