package sos.stacks.ganymed;
import com.trilead.ssh2.SFTPv3Client;
import com.trilead.ssh2.SFTPv3DirectoryEntry;
import com.trilead.ssh2.SFTPv3FileHandle;
import java.io.File;
import java.io.FileOutputStream;
import java.util.Iterator;
import java.util.Vector;
import sos.spooler.Order;
import sos.spooler.Variable_set;
/**
* @author andreas.pueschel@sos-berlin.com
* @author ghassan.beydoun@sos-berlin.com
* $Id: JobSchedulerSFTPReceiveJob.java 3178 2008-01-04 13:49:31Z al $
*
* see job documentation in the package jobdoc for details
*/
public class JobSchedulerSFTPReceiveJob extends JobSchedulerSSHBaseJob {
/** regular expression specifying file names for transfer */
protected String fileSpec = "";
/** local directory for files specified with the attribute fileSpec */
protected String localDir = "";
/** remote directory */
protected String remoteDir = "";
/** Suffix that is added to files during transfer, renaming files after transfer on the target host makes them appear atomically */
protected String atomicSuffix = "";
/** create directories if needed */
protected boolean createDir = true;
/** enable recursive processing of directories */
protected boolean recursive = false;
/** create files with explicit permissions */
protected int permissions = 0;
/** truncate existing files on remote host */
protected boolean truncateFiles = true;
/**
* Processing
*
*/
public boolean spooler_process() {
Order order = null;
Variable_set params = null;
SFTPv3Client sftpClient = null;
int count = 0;
try {
try { // to fetch parameters from orders that have precedence over job parameters
params = spooler_task.params();
if (spooler_task.job().order_queue() != null) {
order = spooler_task.order();
if ( order.params() != null)
params.merge(order.params());
}
// get basic authentication parameters
this.getBaseParameters();
if (params.value("file_spec") != null && params.value("file_spec").length() > 0) {
this.setFileSpec(params.value("file_spec"));
spooler_log.info(".. parameter [file_spec]: " + this.getFileSpec());
} else {
this.setFileSpec("^(.*)$");
}
if (params.value("local_dir") != null && params.value("local_dir").length() > 0) {
this.setLocalDir(this.normalizePath(params.value("local_dir")));
spooler_log.info(".. parameter [local_dir]: " + this.getLocalDir());
} else {
this.setLocalDir(".");
}
if (params.value("remote_dir") != null && params.value("remote_dir").length() > 0) {
this.setRemoteDir(this.normalizePath(params.value("remote_dir")));
spooler_log.info(".. parameter [remote_dir]: " + this.getRemoteDir());
} else {
this.setRemoteDir(".");
}
if (params.value("create_dir") != null && params.value("create_dir").length() > 0) {
if (params.value("create_dir").equalsIgnoreCase("true") || params.value("create_dir").equalsIgnoreCase("yes") || params.value("create_dir").equals("1")) {
this.setCreateDir(true);
} else {
this.setCreateDir(false);
}
spooler_log.info(".. parameter [create_dir]: " + this.isCreateDir());
} else {
this.setCreateDir(true);
}
if (params.value("recursive") != null && params.value("recursive").length() > 0) {
if (params.value("recursive").equalsIgnoreCase("true") || params.value("recursive").equalsIgnoreCase("yes") || params.value("recursive").equals("1")) {
this.setRecursive(true);
} else {
this.setRecursive(false);
}
spooler_log.info(".. parameter [recursive]: " + this.isRecursive());
} else {
this.setRecursive(false);
}
if (params.value("permissions") != null && params.value("permissions").length() > 0) {
try {
this.setPermissions(Integer.parseInt(params.value("permissions")));
spooler_log.info(".. parameter [permissions]: " + this.getPermissions());
} catch (Exception e) {
throw new Exception("illegal octal value for parameter [permissions]: " + params.value("permissions"));
}
} else {
this.setPermissions(0);
}
if (params.value("truncate_files") != null && params.value("truncate_files").length() > 0) {
if (params.value("truncate_files").equalsIgnoreCase("true") || params.value("truncate_files").equalsIgnoreCase("yes") || params.value("truncate_files").equals("1")) {
this.setTruncateFiles(true);
} else {
this.setTruncateFiles(false);
}
spooler_log.info(".. parameter [truncate_files]: " + this.isTruncateFiles());
} else {
this.setTruncateFiles(true);
}
if (params.value("atomic_suffix") != null && params.value("atomic_suffix").length() > 0) {
this.setAtomicSuffix(params.value("atomic_suffix"));
spooler_log.info(".. parameter [atomic_suffix]: " + this.getAtomicSuffix());
} else {
this.setAtomicSuffix("");
}
} catch (Exception e) {
throw new Exception("error occurred processing parameters: " + e.getMessage());
}
try { // to connect, authenticate and process files
this.getBaseAuthentication();
sftpClient = new SFTPv3Client(this.getSshConnection());
spooler_log.info("remote host SFTP protocol version: " + sftpClient.getProtocolVersion() );
if (!this.sshFileExists(sftpClient, this.getRemoteDir()))
throw new Exception("remote directory does not exist: " + this.getRemoteDir());
if (!new File(this.getLocalDir()).exists()) {
if (this.isCreateDir()) {
try {
// TODO use permissions: should no explicit permissions have been set then use the default permissions of the users home directory
new File(this.getLocalDir()).mkdirs();
} catch (Exception e) {
throw new Exception("error occurred creating local directory [" + this.getLocalDir() + "]: " + e.getMessage());
}
} else {
throw new Exception("local directory does not exist: " + this.getLocalDir());
}
}
if ( isRecursive()) {
spooler_log.debug5("..recursive mode set.");
count = copyRecursive( this.getRemoteDir() , this.getLocalDir(), sftpClient);
} else {
count = copy( this.getRemoteDir() , this.getLocalDir(), sftpClient);
}
switch (count) {
case 0:
throw new Exception("no matching files found");
case 1:
spooler_log.info("1 file transferred");
break;
default:
spooler_log.info(count + " files transferred");
break;
}
} catch (Exception e) {
if (sftpClient != null) try { sftpClient.close(); sftpClient = null; } catch (Exception ex) {} // gracefully ignore this error
throw new Exception("error occurred processing files: " + e.getMessage());
} finally {
if (this.getSshConnection() != null) try { this.getSshConnection().close(); this.setSshConnection(null); } catch (Exception ex) {} // gracefully ignore this error
}
// return value for classic and order driven processing
return (spooler_task.job().order_queue() != null);
} catch (Exception e) {
spooler_log.warn(e.getMessage());
return false;
}
}
/**
* copy a directory from the remote host to the local one recursivly.
*
* @param sourceLocation the source directory on the remote host
* @param targetLocation the target directory on the local host
* @param sftpClient is an instance of SFTPv3Client that makes SFTP client connection over SSH-2
* @return the number of files successfully copied
* @throws Exception
*/
private int copyRecursive(String sourceLocation, String targetLocation,
SFTPv3Client sftpClient) throws Exception {
int count = 0;
if (this.isDirectory(sftpClient, sourceLocation)) {
Vector filelist = sftpClient.ls(sourceLocation);
Iterator iterator = filelist.iterator();
while (iterator.hasNext()) {
SFTPv3DirectoryEntry dirEntry = (SFTPv3DirectoryEntry) iterator
.next();
if (dirEntry == null)
continue;
if (dirEntry.filename.equals(".")
|| dirEntry.filename.equals(".."))
continue;
count += copyRecursive(sourceLocation + "/" + dirEntry.filename, targetLocation + "/" + dirEntry.filename, sftpClient);
} // while
} else {
if ( copyFile(sourceLocation, targetLocation, sftpClient))
count++;
else
return count;
}// else
return count;
}//copyRecursive
/**
* copy a directory from the remote host to the local one.
*
* @param sourceLocation the source directory on the remote host
* @param targetLocation the target directory on the local host
* @param sftpClient is an instance of SFTPv3Client that makes SFTP client connection over SSH-2
* @return the number of files successfully copied
* @throws Exception
*/
private int copy(String sourceLocation, String targetLocation,
SFTPv3Client sftpClient) throws Exception {
int count = 0;
if (this.isDirectory(sftpClient, sourceLocation)) {
Vector filelist = sftpClient.ls(sourceLocation);
Iterator iterator = filelist.iterator();
while (iterator.hasNext()) {
SFTPv3DirectoryEntry dirEntry = (SFTPv3DirectoryEntry) iterator
.next();
if (dirEntry == null)
continue;
if (dirEntry.filename.equals(".")
|| dirEntry.filename.equals("..") || this.isDirectory(sftpClient, sourceLocation + "/"+dirEntry.filename))
continue;
if ( copyFile(sourceLocation + "/" + dirEntry.filename, targetLocation + "/" + dirEntry.filename, sftpClient) ) {
count++;
}
} // while
} // if
return count;
}//copy
/**
*
* @param sourceLocation
* @param targetLocation
* @param sftpClient
* @return
* @throws Exception
*/
private boolean copyFile(String sourceLocation, String targetLocation,
SFTPv3Client sftpClient) throws Exception {
SFTPv3FileHandle sftpFileHandle = null;
FileOutputStream fos = null;
File transferFile = null;
long remoteFileSize = -1;
try {
if (!new File(sourceLocation).getName().matches(this.getFileSpec()))
return false;
// TODO use permissions, should no explicit permissions have
// been set then use the default permissions of the users
// home directory
transferFile = new File(targetLocation
+ this.getAtomicSuffix());
new File(transferFile.getParent()).mkdirs();
remoteFileSize = this.getFileSize(sftpClient, sourceLocation);
spooler_log.info("receiving file: " + sourceLocation + " to " + transferFile.getAbsolutePath() + " " + remoteFileSize + " bytes");
sftpFileHandle = sftpClient.openFileRO(sourceLocation);
// TODO depending on the value of the "truncate" parameter:
// true=files are overwritten (default), false=append
// content of remote files to existing local files, use the
// below offset for implementation
fos = null;
long offset = 0;
try {
fos = new FileOutputStream(transferFile);
byte[] buffer = new byte[1024];
while (true) {
int len = sftpClient.read(sftpFileHandle, offset,
buffer, 0, buffer.length);
if (len <= 0)
break;
fos.write(buffer, 0, len);
offset += len;
}
fos.flush();
fos.close();
fos = null;
} catch (Exception e) {
throw new Exception("error occurred writing file ["
+ transferFile.getAbsolutePath() + "]: "
+ e.getMessage());
} finally {
if (fos != null)
try {
fos.close();
fos = null;
} catch (Exception ex) {
} // gracefully ignore this error
}
sftpClient.closeFile(sftpFileHandle);
sftpFileHandle = null;
if (remoteFileSize > 0 && remoteFileSize != transferFile.length())
throw new Exception("remote file size [" + remoteFileSize + "] and local file size [" + transferFile.length() + "] are different. Number of bytes written to local file: " + offset);
if (this.getAtomicSuffix() != null
&& this.getAtomicSuffix().length() > 0) {
File atomicFile = new File(targetLocation);
if (atomicFile.exists())
atomicFile.delete();
spooler_log.info("renaming file: "
+ transferFile.getAbsolutePath() + " to "
+ atomicFile.getName());
transferFile.renameTo(atomicFile);
}
return true;
} catch (Exception e) {
throw e;
} finally {
try { sftpClient.closeFile(sftpFileHandle); } catch(Exception e ) {}
}
}
/**
* @return Returns the fileSpec.
*/
public String getFileSpec() {
return fileSpec;
}
/**
* @param fileSpec
* The fileSpec to set.
*/
public void setFileSpec(String fileSpec) {
this.fileSpec = fileSpec;
}
/**
* @return Returns the atomicSuffix.
*/
public String getAtomicSuffix() {
return atomicSuffix;
}
/**
* @param atomicSuffix
* The atomicSuffix to set.
*/
public void setAtomicSuffix(String atomicSuffix) {
this.atomicSuffix = atomicSuffix;
}
/**
* @return Returns the localDir.
*/
public String getLocalDir() {
return localDir;
}
/**
* @param localDir The localDir to set.
*/
public void setLocalDir(String localDir) {
this.localDir = localDir;
}
/**
* @return Returns the remoteDir.
*/
public String getRemoteDir() {
return remoteDir;
}
/**
* @param remoteDir The remoteDir to set.
*/
public void setRemoteDir(String remoteDir) {
this.remoteDir = remoteDir;
}
/**
* @return Returns the createDir.
*/
public boolean isCreateDir() {
return createDir;
}
/**
* @param createDir The createDir to set.
*/
public void setCreateDir(boolean createDir) {
this.createDir = createDir;
}
/**
* @return Returns the permissions.
*/
public int getPermissions() {
return permissions;
}
/**
* @param permissions The permissions to set.
*/
public void setPermissions(int permissions) {
this.permissions = permissions;
}
/**
* @return Returns the truncateFiles.
*/
public boolean isTruncateFiles() {
return truncateFiles;
}
/**
* @param truncateFiles The truncateFiles to set.
*/
public void setTruncateFiles(boolean truncateFiles) {
this.truncateFiles = truncateFiles;
}
/**
* @return Returns the recursive.
*/
public boolean isRecursive() {
return recursive;
}
/**
* @param recursive The recursive to set.
*/
public void setRecursive(boolean recursive) {
this.recursive = recursive;
}
}
|