BigData Workflow Engine for Hadoop, Hbase, Netezza, Pig, Hive, Cascalog ...
The Hadoop Module can be used to connect to different cluster. I.e. you can interact with more than one cluster. In all methods the clusterName can be excluded to refer to the default configured cluster.
Class: HDFSModuleImpl
Method | Description | Example | |
---|---|---|---|
cat(clusterName:String = null, path:String):String | Returns the text to a path |
|
|
eachLine(clusterName:String = null, path:String) | Recursively searches for non hidden files in a directory and return each line, if the file is compressed it will be decompressed first, each line is sent to the closure |
|
|
withDecompressedInputStream(clusterName:String = null, file:String) | Reads the file and if compressed, use the Hadoop configured Codec to decompress, if the file is compressed a CompressionInputStream is passed to the Closure |
|
|
list(cluserName:String = null, path:String ) | Recursively lists all non hidden files |
|
|
put(clusterName:String = null, src:String, dest:String) | Loads a local file to HDFS |
|
|
get(clusterName:String = null, hdfsSrc:String, localDest:String) | Download a file from HDFS to local |
|
|
delete(clusterName:String = null, file:String, recursive:boolean) | Delete a file or directory (if rescursive is true) | ||
exist(clusterName:String = null, file:String):boolean | Returns true if a file or directory exist |
|
|
mkdirs(clusterName:String = null, file:String) | Make all parent and sub-directories in the path |
|
|
isDirectory(clusterName:String = null, file:String):boolean | Returns true if a directory | ||
isFile(clusterName:String = null, file:String):boolean | Returns true if a file | ||
move(clusterName:String = null, src:String, dest:String) | Move a file from one HDFS location to another HDFS location |
|
|
open(clusterName:String = null, file:String):FSDataInputStream | Open a InputStream for reading data to an HDFS File | ||
open(clusterName:String = null, file:String, closure) | Open a InputStream for reading data to an HDFS File and send it to the closure | e.g. Write to local file: new File('localfile').withOutputStream { out -> ctx.hdfs.open 'mynfile.txt', {is-> out << is } } | |
create(clusterName:String = null, file:String):FSDataOutputStream | Create a new file on HDFS | ||
unTar(localFile:String, localDir:String) | Extracts the contents of a TAR file |
|
|
unZip(localFile:String zipDir:String) | Unzips a ZIP/GZIP file |
|
|
getFileStatus(clusterName:String = null, file:String):FileStatus | Returns the HDFS File Status | ||
setTimes(clusterName:String = null, file:String, mtime:long, atime:long) | calls the FileSystem.setTimes method on a file | ||
setOwner(clusterName:String = null, file:String, username:String, groupname:String) | calls the FileSystem.setOwner on a file | ||
setPermissions(clusterName:String = null, file:String, unixStylePermissions:String) | calls the FileSystem.setPermission on a file | ||
getContentSummary(clusterName:String = null, file:String)ContentSummary | Returns the content summary of a file or directory | ||
getDefaultBlockSize(clusterName:String = null):long | Returns the default block size | ||
getDefaultReplication(clusterName:String = null):short | Returns the default replication | ||
timeSeries(n:String,tableHDFSDir:String,nowdate:Date, modifyTime:String, partitionFormatter:Closure |
Iterates from nowdate-n (e.g. nowdate - 2.hours) on a date increment over a series of date partitioned HDFS files and return true if all of the partitions have files and have not been modified since a specified amount of time | ||
downloadChunked(clusterName:String=null, hdfsDir:Collection |
Performs a eachLine on each of the direcotries in hdfsDir, writing the data (compressed) to a local part file, when the chunk size is reached the file is rolled and its file name send to the callBack closure. |
Method | Description | Example | |
---|---|---|---|
cat(clusterName:String = null, path:String):String | Returns the text to a path |
|
|
eachLine(clusterName:String = null, path:String) | Recursively searches for non hidden files in a directory and return each line, if the file is compressed it will be decompressed first, each line is sent to the closure |
|
|
seq_eachLine(clusterName:String = null, path:String):Collection | Same as eachLine but returns a lazy sequence |
|
|
withDecompressedInputStream(clusterName:String = null, file:String) | Reads the file and if compressed, use the Hadoop configured Codec to decompress, if the file is compressed a CompressionInputStream is passed to the Closure |
|
|
list(cluserName:String = null, path:String ) | Recursively lists all non hidden files |
|
|
seq_list(clusterName:String = null, path:String):Collection | Returns a ist of files, the method iterates recursively through all directories |
|
|
put(clusterName:String = null, src:String, dest:String) | Loads a local file to HDFS |
|
|
get(clusterName:String = null, hdfsSrc:String, localDest:String) | Download a file from HDFS to local |
|
|
delete(clusterName:String = null, file:String, recursive:boolean) | Delete a file or directory (if rescursive is true) | ||
exist(clusterName:String = null, file:String):boolean | Returns true if a file or directory exist |
|
|
mkdirs(clusterName:String = null, file:String) | Make all parent and sub-directories in the path |
|
|
isDirectory(clusterName:String = null, file:String):boolean | Returns true if a directory | ||
isFile(clusterName:String = null, file:String):boolean | Returns true if a file | ||
move(clusterName:String = null, src:String, dest:String) | Move a file from one HDFS location to another HDFS location |
|
|
open(clusterName:String = null, file:String):FSDataInputStream | Open a InputStream for reading data to an HDFS File | (def input (ctx-hdfs open "myfile")) | |
open(clusterName:String = null, file:String, closure) | Open a InputStream for reading data to an HDFS File and send it to the closure | ||
create(clusterName:String = null, file:String):FSDataOutputStream | Create a new file on HDFS | ||
unTar(localFile:String, localDir:String) | Extracts the contents of a TAR file |
|
|
unZip(localFile:String zipDir:String) | Unzips a ZIP/GZIP file |
|
|
getFileStatus(clusterName:String = null, file:String):FileStatus | Returns the HDFS File Status | ||
setTimes(clusterName:String = null, file:String, mtime:long, atime:long) | calls the FileSystem.setTimes method on a file | ||
setOwner(clusterName:String = null, file:String, username:String, groupname:String) | calls the FileSystem.setOwner on a file | ||
setPermissions(clusterName:String = null, file:String, unixStylePermissions:String) | calls the FileSystem.setPermission on a file | ||
getContentSummary(clusterName:String = null, file:String)ContentSummary | Returns the content summary of a file or directory | ||
getDefaultBlockSize(clusterName:String = null):long | Returns the default block size | ||
getDefaultReplication(clusterName:String = null):short | Returns the default replication | ||
timeSeries(n:String,tableHDFSDir:String,nowdate:Date, modifyTime:String, partitionFormatter:Closure |
Iterates from nowdate-n (e.g. nowdate - 2.hours) on a date increment over a series of date partitioned HDFS files and return true if all of the partitions have files and have not been modified since a specified amount of time | ||
downloadChunked(clusterName:String=null, hdfsDir:Collection |
Performs a eachLine on each of the direcotries in hdfsDir, writing the data (compressed) to a local part file, when the chunk size is reached the file is rolled and its file name send to the callBack closure. |
Method | Description | Example | |
---|---|---|---|
cat(clusterName:String = null, path:String):String | Returns the text to a path |
|
|
eachLine(clusterName:String = null, path:String) | Recursively searches for non hidden files in a directory and return each line, if the file is compressed it will be decompressed first, each line is sent to the closure |
|
|
seq_eachLine(clusterName:String = null, path:String) | Same as eachLine but returns a lazy sequence of lines |
| |
withDecompressedInputStream(clusterName:String = null, file:String) | Reads the file and if compressed, use the Hadoop configured Codec to decompress, if the file is compressed a CompressionInputStream is passed to the Closure |
|
|
list(cluserName:String = null, path:String ) | Recursively lists all non hidden files |
|
seq_list(cluserName:String = null, path:String ):Collection | Recursively lists all non hidden files and returns a collection |
|
put(clusterName:String = null, src:String, dest:String) | Loads a local file to HDFS |
|
|
get(clusterName:String = null, hdfsSrc:String, localDest:String) | Download a file from HDFS to local |
|
|
delete(clusterName:String = null, file:String, recursive:boolean) | Delete a file or directory (if rescursive is true) | ||
exist(clusterName:String = null, file:String):boolean | Returns true if a file or directory exist |
|
|
mkdirs(clusterName:String = null, file:String) | Make all parent and sub-directories in the path |
|
|
isDirectory(clusterName:String = null, file:String):boolean | Returns true if a directory | ||
isFile(clusterName:String = null, file:String):boolean | Returns true if a file | ||
move(clusterName:String = null, src:String, dest:String) | Move a file from one HDFS location to another HDFS location |
|
|
open(clusterName:String = null, file:String):FSDataInputStream | Open a InputStream for reading data to an HDFS File | ||
open(clusterName:String = null, file:String, closure) | Open a InputStream for reading data to an HDFS File and send it to the closure | ||
create(clusterName:String = null, file:String):FSDataOutputStream | Create a new file on HDFS | ||
unTar(localFile:String, localDir:String) | Extracts the contents of a TAR file |
|
|
unZip(localFile:String zipDir:String) | Unzips a ZIP/GZIP file |
|
|
getFileStatus(clusterName:String = null, file:String):FileStatus | Returns the HDFS File Status | ||
setTimes(clusterName:String = null, file:String, mtime:long, atime:long) | calls the FileSystem.setTimes method on a file | ||
setOwner(clusterName:String = null, file:String, username:String, groupname:String) | calls the FileSystem.setOwner on a file | ||
setPermissions(clusterName:String = null, file:String, unixStylePermissions:String) | calls the FileSystem.setPermission on a file | ||
getContentSummary(clusterName:String = null, file:String)ContentSummary | Returns the content summary of a file or directory | ||
getDefaultBlockSize(clusterName:String = null):long | Returns the default block size | ||
getDefaultReplication(clusterName:String = null):short | Returns the default replication | ||
timeSeries(n:String,tableHDFSDir:String,nowdate:Date, modifyTime:String, partitionFormatter:Closure |
Iterates from nowdate-n (e.g. nowdate - 2.hours) on a date increment over a series of date partitioned HDFS files and return true if all of the partitions have files and have not been modified since a specified amount of time | ||
downloadChunked(clusterName:String=null, hdfsDir:Collection |
Performs a eachLine on each of the direcotries in hdfsDir, writing the data (compressed) to a local part file, when the chunk size is reached the file is rolled and its file name send to the callBack closure. |