Glue

BigData Workflow Engine for Hadoop, Hbase, Netezza, Pig, Hive, Cascalog ...

Hadoop Module API

TOC

The Hadoop Module can be used to connect to different cluster. I.e. you can interact with more than one cluster. In all methods the clusterName can be excluded to refer to the default configured cluster.

Class: HDFSModuleImpl

Method Description Example
cat(clusterName:String = null, path:String):String Returns the text to a path
println ctx.hdfs.cat('myfile.txt')
eachLine(clusterName:String = null, path:String) Recursively searches for non hidden files in a directory and return each line, if the file is compressed it will be decompressed first, each line is sent to the closure
ctx.hfds.eachLine 'myfile.txt', { line -> println line }
withDecompressedInputStream(clusterName:String = null, file:String) Reads the file and if compressed, use the Hadoop configured Codec to decompress, if the file is compressed a CompressionInputStream is passed to the Closure
ctx.hdfs.withDecompressInputStream "myfile.gz", { input ->   }
list(cluserName:String = null, path:String ) Recursively lists all non hidden files
ctx.hdfs.list 'mydir', { file -> println file }
put(clusterName:String = null, src:String, dest:String) Loads a local file to HDFS
ctx.hdfs.put('myfile.txt', '/hdfsdir/')
get(clusterName:String = null, hdfsSrc:String, localDest:String) Download a file from HDFS to local
ctx.hdfs.get('/hdfsdir/myfile.txt', 'localfile.txt')
delete(clusterName:String = null, file:String, recursive:boolean) Delete a file or directory (if rescursive is true)
exist(clusterName:String = null, file:String):boolean Returns true if a file or directory exist
ctx.hdfs.exist('mydir')
mkdirs(clusterName:String = null, file:String) Make all parent and sub-directories in the path
ctx.hdfs.mkdirs('/path1/path2/path2')
isDirectory(clusterName:String = null, file:String):boolean Returns true if a directory
isFile(clusterName:String = null, file:String):boolean Returns true if a file
move(clusterName:String = null, src:String, dest:String) Move a file from one HDFS location to another HDFS location
ctx.hdfs.put('/hdfsdir1/myfile1.txt', '/hdfsdir2/')
open(clusterName:String = null, file:String):FSDataInputStream Open a InputStream for reading data to an HDFS File
open(clusterName:String = null, file:String, closure) Open a InputStream for reading data to an HDFS File and send it to the closure e.g. Write to local file: new File('localfile').withOutputStream { out -> ctx.hdfs.open 'mynfile.txt', {is-> out << is } }
create(clusterName:String = null, file:String):FSDataOutputStream Create a new file on HDFS
unTar(localFile:String, localDir:String) Extracts the contents of a TAR file
ctx.hdfs.unTar('myfile.tar', '/opt/glue/log/${ctx.unitId}/tardir')
unZip(localFile:String zipDir:String) Unzips a ZIP/GZIP file
ctx.hdfs.unZip('myfile.gzip', '/opt/glue/log/${ctx.unitId}/zipdir')
getFileStatus(clusterName:String = null, file:String):FileStatus Returns the HDFS File Status
setTimes(clusterName:String = null, file:String, mtime:long, atime:long) calls the FileSystem.setTimes method on a file
setOwner(clusterName:String = null, file:String, username:String, groupname:String) calls the FileSystem.setOwner on a file
setPermissions(clusterName:String = null, file:String, unixStylePermissions:String) calls the FileSystem.setPermission on a file
getContentSummary(clusterName:String = null, file:String)ContentSummary Returns the content summary of a file or directory
getDefaultBlockSize(clusterName:String = null):long Returns the default block size
getDefaultReplication(clusterName:String = null):short Returns the default replication
timeSeries(n:String,tableHDFSDir:String,nowdate:Date, modifyTime:String, partitionFormatter:Closure, dateIncrement:Closure, collector:Collector = null) Iterates from nowdate-n (e.g. nowdate - 2.hours) on a date increment over a series of date partitioned HDFS files and return true if all of the partitions have files and have not been modified since a specified amount of time
downloadChunked(clusterName:String=null, hdfsDir:Collection, localDir:String, chunkSize:int=1073741824, compression:String = "gz", callBack:Closure) Performs a eachLine on each of the direcotries in hdfsDir, writing the data (compressed) to a local part file, when the chunk size is reached the file is rolled and its file name send to the callBack closure.
Method Description Example
cat(clusterName:String = null, path:String):String Returns the text to a path
(ctx-hdfs cat "myfile.txt")
eachLine(clusterName:String = null, path:String) Recursively searches for non hidden files in a directory and return each line, if the file is compressed it will be decompressed first, each line is sent to the closure
(ctx-hfds eachLine "myfile.txt"  (fn [line] -> (prn line)))
seq_eachLine(clusterName:String = null, path:String):Collection Same as eachLine but returns a lazy sequence
(def lines (map str (ctx-hdfs seq_eachLine "myfile.txt")))
withDecompressedInputStream(clusterName:String = null, file:String) Reads the file and if compressed, use the Hadoop configured Codec to decompress, if the file is compressed a CompressionInputStream is passed to the Closure
(ctx-hdfs withDecompressInputStream "myfile.gz" (fn [input]   ))
list(cluserName:String = null, path:String ) Recursively lists all non hidden files
(ctx-hdfs list "mydir" (fn [file] (prn file )))
seq_list(clusterName:String = null, path:String):Collection Returns a ist of files, the method iterates recursively through all directories
(def files (ctx-hdfs seq_list "mydir"))
put(clusterName:String = null, src:String, dest:String) Loads a local file to HDFS
(ctx-hdfs put "myfile.txt"  "/hdfsdir/")
get(clusterName:String = null, hdfsSrc:String, localDest:String) Download a file from HDFS to local
(ctx-hdfs get "/hdfsdir/myfile.txt" "localfile.txt")
delete(clusterName:String = null, file:String, recursive:boolean) Delete a file or directory (if rescursive is true)
exist(clusterName:String = null, file:String):boolean Returns true if a file or directory exist
(def exit (ctx-hdfs exist "mydir"))
mkdirs(clusterName:String = null, file:String) Make all parent and sub-directories in the path
(ctx-hdfs mkdirs "/path1/path2/path2")
isDirectory(clusterName:String = null, file:String):boolean Returns true if a directory
isFile(clusterName:String = null, file:String):boolean Returns true if a file
move(clusterName:String = null, src:String, dest:String) Move a file from one HDFS location to another HDFS location
(ctx-hdfs put "/hdfsdir1/myfile1.txt" "/hdfsdir2/")
open(clusterName:String = null, file:String):FSDataInputStream Open a InputStream for reading data to an HDFS File (def input (ctx-hdfs open "myfile"))
open(clusterName:String = null, file:String, closure) Open a InputStream for reading data to an HDFS File and send it to the closure
create(clusterName:String = null, file:String):FSDataOutputStream Create a new file on HDFS
unTar(localFile:String, localDir:String) Extracts the contents of a TAR file
(ctx-hdfs unTar "myfile.tar" (str "/opt/glue/log/" (.getUnitId ctx) "/tardir"))
unZip(localFile:String zipDir:String) Unzips a ZIP/GZIP file
(ctx-hdfs unZip "myfile.gzip" (str "/opt/glue/log/" (.getUnitId ctx) "/zipdir"))
getFileStatus(clusterName:String = null, file:String):FileStatus Returns the HDFS File Status
setTimes(clusterName:String = null, file:String, mtime:long, atime:long) calls the FileSystem.setTimes method on a file
setOwner(clusterName:String = null, file:String, username:String, groupname:String) calls the FileSystem.setOwner on a file
setPermissions(clusterName:String = null, file:String, unixStylePermissions:String) calls the FileSystem.setPermission on a file
getContentSummary(clusterName:String = null, file:String)ContentSummary Returns the content summary of a file or directory
getDefaultBlockSize(clusterName:String = null):long Returns the default block size
getDefaultReplication(clusterName:String = null):short Returns the default replication
timeSeries(n:String,tableHDFSDir:String,nowdate:Date, modifyTime:String, partitionFormatter:Closure, dateIncrement:Closure, collector:Collector = null) Iterates from nowdate-n (e.g. nowdate - 2.hours) on a date increment over a series of date partitioned HDFS files and return true if all of the partitions have files and have not been modified since a specified amount of time
downloadChunked(clusterName:String=null, hdfsDir:Collection, localDir:String, chunkSize:int=1073741824, compression:String = "gz", callBack:Closure) Performs a eachLine on each of the direcotries in hdfsDir, writing the data (compressed) to a local part file, when the chunk size is reached the file is rolled and its file name send to the callBack closure.
Method Description Example
cat(clusterName:String = null, path:String):String Returns the text to a path
print(str(ctx.hdfs().cat('myfile.txt')))
eachLine(clusterName:String = null, path:String) Recursively searches for non hidden files in a directory and return each line, if the file is compressed it will be decompressed first, each line is sent to the closure
def lineHandler(line):
    print(str(line))


ctx.hfds().eachLine("myfile.txt", lineHandler)
seq_eachLine(clusterName:String = null, path:String) Same as eachLine but returns a lazy sequence of lines
for line in ctx.hdfs().eachLine("myfile.txt"):
    print(str(line))
withDecompressedInputStream(clusterName:String = null, file:String) Reads the file and if compressed, use the Hadoop configured Codec to decompress, if the file is compressed a CompressionInputStream is passed to the Closure
def compRead(input):
    #handle the input stream


ctx.hdfs().withDecompressInputStream("myfile.gz", compRead)
list(cluserName:String = null, path:String ) Recursively lists all non hidden files
def prnFile(file):
    print(str(file))

ctx.hdfs().list("mydir", prnFile)
seq_list(cluserName:String = null, path:String ):Collection Recursively lists all non hidden files and returns a collection
for(file in ctx.hdfs().seq_list("mydir", prnFile)):
   print(str(file))
put(clusterName:String = null, src:String, dest:String) Loads a local file to HDFS
ctx.hdfs().put('myfile.txt', '/hdfsdir/')
get(clusterName:String = null, hdfsSrc:String, localDest:String) Download a file from HDFS to local
ctx.hdfs().get('/hdfsdir/myfile.txt', 'localfile.txt')
delete(clusterName:String = null, file:String, recursive:boolean) Delete a file or directory (if rescursive is true)
exist(clusterName:String = null, file:String):boolean Returns true if a file or directory exist
ctx.hdfs().exist('mydir')
mkdirs(clusterName:String = null, file:String) Make all parent and sub-directories in the path
ctx.hdfs().mkdirs('/path1/path2/path2')
isDirectory(clusterName:String = null, file:String):boolean Returns true if a directory
isFile(clusterName:String = null, file:String):boolean Returns true if a file
move(clusterName:String = null, src:String, dest:String) Move a file from one HDFS location to another HDFS location
ctx.hdfs().put('/hdfsdir1/myfile1.txt', '/hdfsdir2/')
open(clusterName:String = null, file:String):FSDataInputStream Open a InputStream for reading data to an HDFS File
open(clusterName:String = null, file:String, closure) Open a InputStream for reading data to an HDFS File and send it to the closure
create(clusterName:String = null, file:String):FSDataOutputStream Create a new file on HDFS
unTar(localFile:String, localDir:String) Extracts the contents of a TAR file
ctx.hdfs().unTar('myfile.tar', '/opt/glue/log/$' + str(ctx.getUnitId()) + '/tardir')
unZip(localFile:String zipDir:String) Unzips a ZIP/GZIP file
ctx.hdfs().unZip('myfile.gzip', '/opt/glue/log/' + str(ctx.getUnitId()) + '/zipdir')
getFileStatus(clusterName:String = null, file:String):FileStatus Returns the HDFS File Status
setTimes(clusterName:String = null, file:String, mtime:long, atime:long) calls the FileSystem.setTimes method on a file
setOwner(clusterName:String = null, file:String, username:String, groupname:String) calls the FileSystem.setOwner on a file
setPermissions(clusterName:String = null, file:String, unixStylePermissions:String) calls the FileSystem.setPermission on a file
getContentSummary(clusterName:String = null, file:String)ContentSummary Returns the content summary of a file or directory
getDefaultBlockSize(clusterName:String = null):long Returns the default block size
getDefaultReplication(clusterName:String = null):short Returns the default replication
timeSeries(n:String,tableHDFSDir:String,nowdate:Date, modifyTime:String, partitionFormatter:Closure, dateIncrement:Closure, collector:Collector = null) Iterates from nowdate-n (e.g. nowdate - 2.hours) on a date increment over a series of date partitioned HDFS files and return true if all of the partitions have files and have not been modified since a specified amount of time
downloadChunked(clusterName:String=null, hdfsDir:Collection, localDir:String, chunkSize:int=1073741824, compression:String = "gz", callBack:Closure) Performs a eachLine on each of the direcotries in hdfsDir, writing the data (compressed) to a local part file, when the chunk size is reached the file is rolled and its file name send to the callBack closure.