BigData Workflow Engine for Hadoop, Hbase, Netezza, Pig, Hive, Cascalog ...
Two types of triggers are supported:
For cron expression syntax see quartz-scheduler
Triggers require three tables in mysql to exist:
describe unittriggers;
+---------+--------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+---------+--------------+------+-----+---------+----------------+
| id | int(11) | NO | PRI | NULL | auto_increment |
| unit | varchar(100) | YES | | NULL | |
| type | varchar(10) | YES | | NULL | |
| data | varchar(100) | YES | | NULL | |
| lastrun | date | YES | | NULL | |
+---------+--------------+------+-----+---------+----------------+
describe hdfsfiles;
+-------+---------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+-------+---------------+------+-----+---------+----------------+
| id | int(11) | NO | PRI | NULL | auto_increment |
| path | varchar(1000) | NO | UNI | NULL | |
| ts | bigint(20) | NO | MUL | 0 |
| seen | tinyint(4) | YES | MUL | 0 | |
+-------+---------------+------+-----+---------+----------------+
describe unitfiles;
+--------+-------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+--------+-------------+------+-----+---------+-------+
| unitid | int(11) | YES | MUL | NULL | |
| fileid | int(11) | YES | | NULL | |
| status | varchar(10) | YES | | NULL | |
+--------+-------------+------+-----+---------+-------+
The tables unitfiles and hdfsfiles maintain the hdfs polling state, and the unittriggers table contain the different triggers for each workflow.
E.g. below is an example of some entries:
+----+---------+----------+---------------+---------+
| id | unit | type | data | lastrun |
+----+---------+----------+---------------+---------+
| 1 | test | hdfs | /logs/test | NULL |
| 2 | test1 | hdfs | /logs/a | NULL |
| 3 | test1 | hdfs | /logs/b | NULL |
| 4 | test1 | hdfs-dir | /logs/c | NULL |
| 5 | mytest2 | cron | 0 0/5 * * * ? | NULL |
+----+---------+----------+---------------+---------+
Note: Using hdfs-dir instead of hdfs will only check directories for modification.
I.e. if you only need to see when directory partitions have changed rather than individual files use the type 'hdfs-dir'
Different hdfs paths can be defined for the same workflow. A unique id is assigned for each unit_name, data combination, and its this id that is used in the unitfiles table that matches a file path fileid in hdfsfiles to a units execution.
The folder entries in the data column should not have overlapping files, other wise the workflow would receive the file twice in the ready files, because two distinct entries will be made in the unitfiles table. Wrong way
For unit myunit
data = /log/a
data = /log/a
Correct way For unit myunit
data = /log/a
data = /log/b
The module is configured where all glue modules are configured in the file: /opt/glue/conf/modules.groovy
Properties are:
name description
connection.username database user name
connection.password database password
connection.driver jdbc driver class
connection.url jdbc connection url
triggerStore2{
className="org.glue.trigger.service.hdfs.TriggerStore2Module"
isSingleton="true"
config{
triggerStore{
className='org.glue.trigger.persist.db.DBTriggerStore2'
config{
connection.username="glue"
connection.password="glue"
connection.driver="com.mysql.jdbc.Driver"
connection.url="jdbc:mysql://localhost:3306/glue"
}
}
}
}
def fileIds = []
context.triggerStore2.markFilesAsProcessed( fileIds )
context.triggerStore2.listReadyFiles { int fileId, String path ->
//do something
}
def ready_files = context.triggerStore2.listReadyFiles()
//do something with th ready files each item is a vector of [id path] e.g [[1 "a/b"] [2 "c/d"]]
tasks{
prepareFiles{
tasks = { ctx ->
//regex to extract values out of the partition path
def DATE = /hr=(\d\d\d\d\d\d\d)/
def dateSet as HashSet
def fileIds = []
//here we iterate over all of the files that's been seen newly for this workflow
//in the directories its subsribed to in the glue unittriggers table
ctx.triggerStore2.listReadyFiles { int fileId, String path ->
fileIds << fileId
//extract year, month, day
def m = path =~ DATE
if(m.size() < 1 || m[0].size() < 2) return //skip if none found
if(m[0][1])
dateSet << m[0][1] //now we should have something like yyyyMMdd
}
ctx.fileIds = fileIds
ctx.dateSet = dateSet
}
}
exec{
dependencies = "prepareFiles"
tasks = { ctx ->
//here we have a list of dates that have been updated
def dateSet = ctx.dateSet
}
}
cleanup{
dependencies = "exec"
tasks = { ctx ->
//only a workflow's logic can know when its completed processing a file
//this method marks the file as processed, this file will not appear again
//in the listReadyFiles method
if(ctx.fileIds) ctx.triggerStore2.markFilesAsProcessed( ctx.fileIds )
}
}
}
In mysql do
update unitfiles dest, (select unitid,fileid,status from hdfsfiles, unitfiles where unitid in ($list-of-unitids-from-unittriggers) and fileid=id and path like '%year=$yearPartition/month=$month/day=$day%') src set dest.status="ready" where dest.status="processed" and dest.unitid=src.unitid and dest.fileid=src.fileid ;