package sos.scheduler.ftp;
import java.io.File;
import java.lang.reflect.Constructor;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Vector;
import java.util.regex.Pattern;
import java.util.regex.Matcher;
import sos.net.SOSFTP;
import sos.net.SOSFTPS;
import sos.net.SOSFileTransfer;
import sos.net.SOSMail;
import sos.net.SOSSFTP;
import sos.scheduler.job.JobSchedulerJob;
import sos.spooler.Order;
import sos.spooler.Variable_set;
import sos.util.SOSFileOperations;
import sos.util.SOSSchedulerLogger;
import sos.util.SOSString;
/**
* FTP File Transfer
*
* @author Andreas P�schel
* @author M�r�vet �ks�z
*/
public class JobSchedulerFTPReceive extends JobSchedulerJob {
/** The FTP server will always reply the ftp error codes,
* see http://www.the-eggman.com/seminars/ftp_error_codes.html */
public static final int ERROR_CODE = 300;
private SOSString sosString = new SOSString();
public boolean spooler_process() {
boolean rc = false;
SOSFileTransfer ftpClient = null;
String protocol = "ftp";
boolean sshBasedProtocol = false;
String host = "";
int port = 21;
String user = "anonymous";
String password = "";
String account = "";
String transferMode = "binary";
boolean passiveMode = false;
String remoteDir = "./";
String localDir = ".";
String fileSpec = ".*";
String atomicSuffix = "";
boolean checkSize = true;
long checkInterval = 60;
long checkRetry = 0;
boolean overwriteFiles = true;
boolean appendFiles = false;
boolean removeFiles = false;
boolean skipTransfer = false;
boolean forceFiles = true;
boolean zeroByteFiles = true;
boolean zeroByteFilesStrict = false;
boolean zeroByteFilesRelaxed = false;
String fileNotificationTo = "";
String fileNotificationCC = "";
String fileNotificationBCC = "";
String fileNotificationSubject = "";
String fileNotificationBody = "";
String fileZeroByteNotificationTo = "";
String fileZeroByteNotificationCC = "";
String fileZeroByteNotificationBCC = "";
String fileZeroByteNotificationSubject = "";
String fileZeroByteNotificationBody = "";
boolean isLoggedIn = false;
Vector transferFileList = new Vector();
HashMap checkFileList = new HashMap();
Variable_set params = null;
String replacing = null;
String replacement = null;
boolean recursive = false;
int pollTimeout = 0;
int pollIntervall = 60;
int pollMinFiles = 1;
String pollFilesErrorState = "";
String filePath = "";
boolean isFilePath = false;
// alternative parameters
String alternativeHost = "";
int alternativePort = 0;
String alternativeUser = "";
String alternativePassword = "";
String alternativeAccount = "";
String alternativeRemoteDir = "";
boolean alternativePassiveMode = false;
String alternativeTransferMode = "binary";
// Parallel File Transfer
boolean parallelTransfer = false;
String parallelTransferCheckSetback = "00:00:60";
int parallelTransferCheckRetry = 60;
// Hilfsvariable: wenn gesetzt, dann handelt es sich um einen Wiederholungsauftrag. Also keine Dateien abfrufen
boolean checkParallel = false;
boolean orderSelfDestruct = false;
// Variablen f�r ssh-basierte Transferprotokolle
/** optional proxy configuration */
String proxyHost = "";
int proxyPort = 0;
String proxyUser = "";
String proxyPassword = "";
/** authentication method: publickey, password */
String authenticationMethod = "publickey";
/** key file: ~/.ssh/id_rsa or ~/.ssh/id_dsa */
String authenticationFilename = "";
try {
this.setLogger(new SOSSchedulerLogger(spooler_log));
int iSetbackCount = 1;
try { // to get the job parameters and order parameters
params = spooler.create_variable_set();
if (spooler_task.params() != null) params.merge(spooler_task.params());
if (spooler_job.order_queue() != null && spooler_task.order().params() != null) {
params.merge(spooler_task.order().params());
// initialize setback counter for possible polling
Variable_set orderParams = spooler_task.order().params();
String setbackCount = orderParams.value("setback_count");
getLogger().debug9("setback_count read: "+setbackCount);
if (setbackCount!=null && setbackCount.length()>0){
iSetbackCount = Integer.parseInt(setbackCount);
iSetbackCount++;
}
orderParams.set_var("setback_count", ""+iSetbackCount);
}
if (params.var("ftp_protocol") != null && params.var("ftp_protocol").length() > 0)
protocol = params.var("ftp_protocol");
if (protocol.equalsIgnoreCase("sftp")){
sshBasedProtocol = true;
//use other defaults
port = 22;
}
if (params.var("ftp_host") != null && params.var("ftp_host").length() > 0)
host = params.var("ftp_host");
if (params.var("ftp_port") != null && params.var("ftp_port").length() > 0) {
try {
port = Integer.parseInt(params.var("ftp_port"));
} catch (Exception e) {
throw new Exception("illegal value for parameter [ftp_port]: " + params.var("ftp_port"));
}
}
if (params.var("ftp_user") != null && params.var("ftp_user").length() > 0)
user = params.var("ftp_user");
if (params.var("ftp_password") != null && params.var("ftp_password").length() > 0)
password = params.var("ftp_password");
if (params.var("ftp_account") != null && params.var("ftp_account").length() > 0)
account = params.var("ftp_account");
if (params.var("ftp_transfer_mode") != null && params.var("ftp_transfer_mode").length() > 0)
transferMode = params.var("ftp_transfer_mode");
if (params.var("ftp_passive_mode") != null && params.var("ftp_passive_mode").length() > 0)
passiveMode = (params.var("ftp_passive_mode").equals("1") ||
params.var("ftp_passive_mode").equalsIgnoreCase("true") ||
params.var("ftp_passive_mode").equalsIgnoreCase("yes")? true : false);
if (params.var("ftp_remote_dir") != null && params.var("ftp_remote_dir").length() > 0)
remoteDir = params.var("ftp_remote_dir");
if (params.var("ftp_local_dir") != null && params.var("ftp_local_dir").length() > 0)
localDir = params.var("ftp_local_dir");
if (params.var("ftp_file_spec") != null && params.var("ftp_file_spec").length() > 0)
fileSpec = params.var("ftp_file_spec");
if (params.var("ftp_atomic_suffix") != null && params.var("ftp_atomic_suffix").length() > 0)
atomicSuffix = params.var("ftp_atomic_suffix");
if (params.var("ftp_check_size") != null && params.var("ftp_check_size").length() > 0)
checkSize = (!params.var("ftp_check_size").equals("1")
&& !params.var("ftp_check_size").equalsIgnoreCase("true")
&& !params.var("ftp_check_size").equalsIgnoreCase("yes") ? false : true);
if (params.var("ftp_check_interval") != null && params.var("ftp_check_interval").length() > 0) {
try {
checkInterval = Long.parseLong(params.var("ftp_check_interval"));
} catch (Exception e) {
throw new Exception("illegal value for parameter [ftp_check_interval]: " + params.var("ftp_check_interval"));
}
}
if (params.var("ftp_check_retry") != null && params.var("ftp_check_retry").length() > 0) {
try {
checkRetry = Long.parseLong(params.var("ftp_check_retry"));
} catch (Exception e) {
throw new Exception("illegal value for parameter [ftp_check_retry]: " + params.var("ftp_check_retry"));
}
}
if (params.var("ftp_overwrite_files") != null && params.var("ftp_overwrite_files").length() > 0)
overwriteFiles = (!params.var("ftp_overwrite_files").equals("1")
&& !params.var("ftp_overwrite_files").equalsIgnoreCase("true")
&& !params.var("ftp_overwrite_files").equalsIgnoreCase("yes") ? false : true);
if (params.var("ftp_append_files") != null && params.var("ftp_append_files").length() > 0)
appendFiles = (params.var("ftp_append_files").equals("1")
|| params.var("ftp_append_files").equalsIgnoreCase("true")
|| params.var("ftp_append_files").equalsIgnoreCase("yes") ? true : false);
if (params.var("ftp_remove_files") != null && params.var("ftp_remove_files").length() > 0)
removeFiles = (params.var("ftp_remove_files").equals("1")
|| params.var("ftp_remove_files").equalsIgnoreCase("true")
|| params.var("ftp_remove_files").equalsIgnoreCase("yes") ? true : false);
if (params.var("ftp_skip_transfer") != null && params.var("ftp_skip_transfer").length() > 0)
skipTransfer = (params.var("ftp_skip_transfer").equals("1")
|| params.var("ftp_skip_transfer").equalsIgnoreCase("true")
|| params.var("ftp_skip_transfer").equalsIgnoreCase("yes") ? true : false);
if (params.var("ftp_force_files") != null && params.var("ftp_force_files").length() > 0)
forceFiles = (!params.var("ftp_force_files").equals("1")
&& !params.var("ftp_force_files").equalsIgnoreCase("true")
&& !params.var("ftp_force_files").equalsIgnoreCase("yes") ? false : true);
if (params.var("ftp_file_zero_byte_transfer") != null && params.var("ftp_file_zero_byte_transfer").length() > 0) {
if (params.var("ftp_file_zero_byte_transfer").equals("1")
|| params.var("ftp_file_zero_byte_transfer").equalsIgnoreCase("true")
|| params.var("ftp_file_zero_byte_transfer").equalsIgnoreCase("yes")) {
zeroByteFiles = true;
zeroByteFilesStrict = false;
} else if (params.var("ftp_file_zero_byte_transfer").equalsIgnoreCase("strict")) {
zeroByteFiles = false;
zeroByteFilesStrict = true;
} else if (params.var("ftp_file_zero_byte_transfer").equalsIgnoreCase("relaxed")) {
zeroByteFiles = false;
zeroByteFilesStrict = false;
zeroByteFilesRelaxed = true;
} else {
zeroByteFiles = false;
zeroByteFilesStrict = false;
}
}
if (params.var("ftp_file_notification_to") != null && params.var("ftp_file_notification_to").length() > 0) {
fileNotificationTo = params.var("ftp_file_notification_to");
}
if (params.var("ftp_file_notification_cc") != null && params.var("ftp_file_notification_cc").length() > 0) {
fileNotificationCC = params.var("ftp_file_notification_cc");
}
if (params.var("ftp_file_notification_bcc") != null && params.var("ftp_file_notification_bcc").length() > 0) {
fileNotificationBCC = params.var("ftp_file_notification_bcc");
}
if (params.var("ftp_file_notification_subject") != null && params.var("ftp_file_notification_subject").length() > 0) {
fileNotificationSubject = params.var("ftp_file_notification_subject");
}
if (params.var("ftp_file_notification_body") != null && params.var("ftp_file_notification_body").length() > 0) {
fileNotificationBody = params.var("ftp_file_notification_body");
}
if (params.var("ftp_file_zero_byte_notification_to") != null && params.var("ftp_file_zero_byte_notification_to").length() > 0) {
fileZeroByteNotificationTo = params.var("ftp_file_zero_byte_notification_to");
}
if (params.var("ftp_file_zero_byte_notification_cc") != null && params.var("ftp_file_zero_byte_notification_cc").length() > 0) {
fileZeroByteNotificationCC = params.var("ftp_file_zero_byte_notification_cc");
}
if (params.var("ftp_file_zero_byte_notification_bcc") != null && params.var("ftp_file_zero_byte_notification_bcc").length() > 0) {
fileZeroByteNotificationBCC = params.var("ftp_file_zero_byte_notification_bcc");
}
if (params.var("ftp_file_zero_byte_notification_subject") != null && params.var("ftp_file_zero_byte_notification_subject").length() > 0) {
fileZeroByteNotificationSubject = params.var("ftp_file_zero_byte_notification_subject");
}
if (params.var("ftp_file_zero_byte_notification_body") != null && params.var("ftp_file_zero_byte_notification_body").length() > 0) {
fileZeroByteNotificationBody = params.var("ftp_file_zero_byte_notification_body");
}
if ( params.value("replacing")!=null && params.value("replacing").length()>0 ) {
replacing = params.value("replacing");
}
if ( params.value("replacement")!=null && params.value("replacement").length()>0 ) {
replacement = params.value("replacement");
}
if ( params.value("ftp_recursive")!=null && params.value("ftp_recursive").length()>0 ) {
String sRecursive = "";
sRecursive = params.value("ftp_recursive");
recursive = sosString.parseToBoolean(sRecursive);
}
if ( replacing != null && replacement == null ) {
throw new Exception("job parameter is missing for specified parameter [replacing]: [replacement]");
}
if ( replacing == null && replacement != null ) {
throw new Exception("job parameter is missing for specified parameter [replacement]: [replacing]");
}
if ( params.value("ftp_poll_timeout")!=null && params.value("ftp_poll_timeout").length()>0 ) {
pollTimeout = Integer.parseInt(params.value("ftp_poll_timeout"));
}
if ( params.value("ftp_poll_interval")!=null && params.value("ftp_poll_interval").length()>0 ) {
pollIntervall = Integer.parseInt(params.value("ftp_poll_interval"));
}
if ( params.value("ftp_poll_minfiles")!=null && params.value("ftp_poll_minfiles").length()>0 ) {
pollMinFiles = Integer.parseInt(params.value("ftp_poll_minfiles"));
}
if (params.var("ftp_poll_error_state") != null && params.var("ftp_poll_error_state").length() > 0)
pollFilesErrorState = params.var("ftp_poll_error_state");
// alternative parameters
if (params.var("ftp_alternative_host") != null && params.var("ftp_alternative_host").length() > 0)
alternativeHost = params.var("ftp_alternative_host");
if (params.var("ftp_alternative_port") != null && params.var("ftp_alternative_port").length() > 0)
alternativePort = Integer.parseInt(params.var("ftp_alternative_port"));
if (params.var("ftp_alternative_password") != null && params.var("ftp_alternative_password").length() > 0)
alternativePassword = params.var("ftp_alternative_password");
if (params.var("ftp_alternative_user") != null && params.var("ftp_alternative_user").length() > 0)
alternativeUser = params.var("ftp_alternative_user");
if (params.var("ftp_alternative_account") != null && params.var("ftp_alternative_account").length() > 0)
alternativeAccount = params.var("ftp_alternative_account");
if (params.var("ftp_alternative_remote_dir") != null && params.var("ftp_alternative_remote_dir").length() > 0)
alternativeRemoteDir = params.var("ftp_alternative_remote_dir");
if (params.var("ftp_alternative_passive_mode") != null && params.var("ftp_alternative_passive_mode").length() > 0)
alternativePassiveMode = (params.var("ftp_alternative_passive_mode").equals("1") ? true : false);
if (params.var("ftp_alternative_transfer_mode") != null && params.var("ftp_alternative_transfer_mode").length() > 0)
alternativeTransferMode = params.var("ftp_alternative_transfer_mode");
// parameters for parallel transfer
if (params.var("ftp_parallel") != null && params.var("ftp_parallel").length() > 0)
parallelTransfer = sosString.parseToBoolean(sosString.parseToString(params.var("ftp_parallel")));
if (params.var("ftp_parallel_check_setback") != null && params.var("ftp_parallel_check_setback").length() > 0)
parallelTransferCheckSetback = params.var("ftp_parallel_check_setback");
if (params.var("ftp_parallel_check_retry") != null && params.var("ftp_parallel_check_retry").length() > 0) {
try {
parallelTransferCheckRetry = Integer.parseInt(params.var("ftp_parallel_check_retry"));
} catch (Exception e) {
throw new Exception("illegal value for parameter [ftp_parallel_check_retry]: " + params.var("ftp_parallel_check_retry"));
}
}
// implicit parameters
if(params.var("ftp_check_parallel") != null && params.var("ftp_check_parallel").length() > 0)
checkParallel = sosString.parseToBoolean(sosString.parseToString(params.var("ftp_check_parallel")));
if(params.var("ftp_order_self_destruct") != null && params.var("ftp_order_self_destruct").length() > 0)
orderSelfDestruct = sosString.parseToBoolean(sosString.parseToString(params.var("ftp_order_self_destruct")));
if (sshBasedProtocol){
// parameters for ssh-based protocols
if (params.value("ssh_proxy_host") != null && params.value("ssh_proxy_host").toString().length() > 0) {
proxyHost=params.value("ssh_proxy_host");
}
if (params.value("ssh_proxy_port") != null && params.value("ssh_proxy_port").length() > 0) {
try {
proxyPort=(Integer.parseInt(params.value("ssh_proxy_port")));
} catch (Exception ex) {
throw new Exception("illegal non-numeric value for parameter [ssh_proxy_port]: " + params.value("ssh_proxy_port"));
}
} else {
proxyPort=3128;
}
if (params.value("ssh_proxy_user") != null && params.value("ssh_proxy_user").length() > 0) {
proxyUser=(params.value("ssh_proxy_user"));
}
if (params.value("ssh_proxy_password") != null && params.value("ssh_proxy_password").length() > 0) {
proxyPassword=(params.value("ssh_proxy_password"));
}
if (params.value("ssh_auth_method") != null && params.value("ssh_auth_method").length() > 0) {
if (params.value("ssh_auth_method").equalsIgnoreCase("publickey") || params.value("ssh_auth_method").equalsIgnoreCase("password")) {
authenticationMethod=(params.value("ssh_auth_method"));
} else {
throw new Exception("invalid authentication method [publickey, password] specified: " + params.value("ssh_auth_method"));
}
} else {
authenticationMethod=("publickey");
}
if (params.value("ssh_auth_file") != null && params.value("ssh_auth_file").length() > 0) {
authenticationFilename=(params.value("ssh_auth_file"));
} else {
if (authenticationMethod.equalsIgnoreCase("publickey"))
throw new Exception("no authentication filename was specified as parameter [auth_file");
}
}
// check if http proxy set for SSL/TLS connection
if (params.value("http_proxy_host") != null && params.value("http_proxy_host").toString().length() > 0) {
proxyHost=params.value("http_proxy_host");
}
if (params.value("http_proxy_port") != null && params.value("http_proxy_port").length() > 0) {
try {
proxyPort=(Integer.parseInt(params.value("http_proxy_port")));
} catch (Exception ex) {
throw new NumberFormatException("illegal non-numeric value for parameter [http_proxy_port]: " + params.value("http_proxy_port"));
}
}
} catch (Exception e) {
rc = false;
throw (new Exception("could not process job parameters: " + e));
}
if (fileNotificationTo != null && fileNotificationTo.length() > 0) {
if (fileNotificationSubject == null || fileNotificationSubject.length() == 0) {
if (spooler_job.order_queue() != null) {
fileNotificationSubject = "[info] Job Chain: " + spooler_task.order().job_chain().name() + ", Order: " + spooler_task.order().id() + ", Job: " + spooler_job.name() + " (" + spooler_job.title() + "), Task: " + spooler_task.id();
} else {
fileNotificationSubject = "[info] Job: " + spooler_job.name() + " (" + spooler_job.title() + "), Task: " + spooler_task.id();
}
}
if (fileNotificationBody == null || fileNotificationBody.length() == 0) {
fileNotificationBody = "The following files have been received:\n\n";
}
}
if (fileZeroByteNotificationTo != null && fileZeroByteNotificationTo.length() > 0) {
if (fileZeroByteNotificationSubject == null || fileZeroByteNotificationSubject.length() == 0) {
if (spooler_job.order_queue() != null) {
fileZeroByteNotificationSubject = "[warning] Job Chain: " + spooler_task.order().job_chain().name() + ", Order: " + spooler_task.order().id() + ", Job: " + spooler_job.name() + " (" + spooler_job.title() + "), Task: " + spooler_task.id();
} else {
fileZeroByteNotificationSubject = "[warning] Job: " + spooler_job.name() + " (" + spooler_job.title() + "), Task: " + spooler_task.id();
}
}
if (fileZeroByteNotificationBody == null || fileZeroByteNotificationBody.length() == 0) {
fileZeroByteNotificationBody = "The following files have been received and were removed due to zero byte constraints:\n\n";
}
}
if (params.var("ftp_file_path") != null && params.var("ftp_file_path").length() > 0) {
filePath = params.var("ftp_file_path");
isFilePath = true;
} else {
isFilePath = false;
}
if(isFilePath && parallelTransfer) {
this.getLogger().debug("..parameter ftp_parallel ignored for using ftp_file_path.");
}
this.getLogger().debug(".. job parameter [ftp_protocol] : " + protocol);
this.getLogger().debug(".. job parameter [ftp_host] : " + host);
this.getLogger().debug(".. job parameter [ftp_port] : " + port);
this.getLogger().debug(".. job parameter [ftp_user] : " + user);
this.getLogger().debug(".. job parameter [ftp_account] : " + account);
this.getLogger().debug(".. job parameter [ftp_transfer_mode] : " + transferMode);
this.getLogger().debug(".. job parameter [ftp_passive_mode] : " + passiveMode);
this.getLogger().debug(".. job parameter [ftp_remote_dir] : " + remoteDir);
this.getLogger().debug(".. job parameter [ftp_local_dir] : " + localDir);
this.getLogger().debug(".. job parameter [ftp_file_spec] : " + fileSpec);
this.getLogger().debug(".. job parameter [ftp_atomic_suffix] : " + atomicSuffix);
this.getLogger().debug(".. job parameter [ftp_check_size] : " + checkSize);
this.getLogger().debug(".. job parameter [ftp_check_interval] : " + checkInterval);
this.getLogger().debug(".. job parameter [ftp_check_retry] : " + checkRetry);
this.getLogger().debug(".. job parameter [ftp_overwrite_files] : " + overwriteFiles);
this.getLogger().debug(".. job parameter [ftp_append_files] : " + appendFiles);
this.getLogger().debug(".. job parameter [ftp_remove_files] : " + removeFiles);
this.getLogger().debug(".. job parameter [ftp_force_files] : " + forceFiles);
this.getLogger().debug(".. job parameter [ftp_skip_transfer] : " + skipTransfer);
this.getLogger().debug(".. job parameter [ftp_zero_byte_transfer] zeroByte : " + zeroByteFiles);
this.getLogger().debug(".. job parameter [ftp_zero_byte_transfer] strict : " + zeroByteFilesStrict);
this.getLogger().debug(".. job parameter [ftp_zero_byte_transfer] relaxed : " + zeroByteFilesRelaxed);
this.getLogger().debug(".. job parameter [ftp_file_notification_to] : " + fileNotificationTo);
this.getLogger().debug(".. job parameter [ftp_file_notification_cc] : " + fileNotificationCC);
this.getLogger().debug(".. job parameter [ftp_file_notification_bcc] : " + fileNotificationBCC);
this.getLogger().debug(".. job parameter [ftp_file_notification_subject] : " + fileNotificationSubject);
this.getLogger().debug(".. job parameter [ftp_file_notification_body] : " + fileNotificationBody);
this.getLogger().debug(".. job parameter [ftp_file_zero_byte_notification_to] : " + fileZeroByteNotificationTo);
this.getLogger().debug(".. job parameter [ftp_file_zero_byte_notification_cc] : " + fileZeroByteNotificationCC);
this.getLogger().debug(".. job parameter [ftp_file_zero_byte_notification_bcc] : " + fileZeroByteNotificationBCC);
this.getLogger().debug(".. job parameter [ftp_file_zero_byte_notification_subject]: " + fileZeroByteNotificationSubject);
this.getLogger().debug(".. job parameter [ftp_file_zero_byte_notification_body] : " + fileZeroByteNotificationBody);
this.getLogger().debug(".. job parameter [replacing] : " + replacing);
this.getLogger().debug(".. job parameter [replacement] : " + replacement);
this.getLogger().debug(".. job parameter [ftp_recursive] : " + recursive);
this.getLogger().debug(".. job parameter [ftp_poll_timeout] : " + pollTimeout);
this.getLogger().debug(".. job parameter [ftp_poll_interval] : " + pollIntervall);
this.getLogger().debug(".. job parameter [ftp_poll_minfiles] : " + pollMinFiles);
this.getLogger().debug(".. job parameter [ftp_poll_error_state] : " + pollFilesErrorState);
this.getLogger().debug(".. job parameter [ftp_file_path] : " + filePath);
//alternative Parameter
this.getLogger().debug(".. job parameter [ftp_alternative_host] : " + alternativeHost);
this.getLogger().debug(".. job parameter [ftp_alternative_port] : " + alternativePort);
this.getLogger().debug(".. job parameter [ftp_alternative_user] : " + alternativeUser);
this.getLogger().debug(".. job parameter [ftp_alternative_password] : " + alternativePassword);
this.getLogger().debug(".. job parameter [ftp_alternative_account] : " + alternativeAccount);
this.getLogger().debug(".. job parameter [ftp_alternative_remote_dir] : " + alternativeRemoteDir);
this.getLogger().debug(".. job parameter [ftp_alternative_passive_mode] : " + alternativePassiveMode);
this.getLogger().debug(".. job parameter [ftp_alternative_transfer_mode] : " + alternativeTransferMode);
this.getLogger().debug(".. job parameter [ftp_parallel] : " + parallelTransfer);
this.getLogger().debug(".. job parameter [ftp_parallel_check_setback] : " + parallelTransferCheckSetback);
this.getLogger().debug(".. job parameter [ftp_parallel_check_retry] : " + parallelTransferCheckRetry);
this.getLogger().debug(".. job parameter [ftp_check_parallel] : " + checkParallel);
this.getLogger().debug(".. job parameter [ftp_order_self_destruct] : " + orderSelfDestruct);
if (sshBasedProtocol){
this.getLogger().debug(".. job parameter [ssh_proxy_host] : " + proxyHost);
this.getLogger().debug(".. job parameter [ssh_proxy_port] : " + proxyPort);
this.getLogger().debug(".. job parameter [ssh_proxy_user] : " + proxyUser);
this.getLogger().debug(".. job parameter [ssh_auth_method] : " + authenticationMethod);
this.getLogger().debug(".. job parameter [ssh_auth_file] : " + authenticationFilename);
}
// SSL/TLS
try {
this.getLogger().debug(".. job parameter [http_proxy_host] : " + proxyHost);
this.getLogger().debug(".. job parameter [http_proxy_port] : " + proxyPort);
} catch(Exception e) {}
try { // to check parameters
if(checkParallel) {
boolean bSuccess = true;
String[] paramNames = sosString.parseToString(spooler.variables().names()).split(";");
for ( int i = 0; i < paramNames.length; i++) {
if(paramNames[i].startsWith("ftp_check_receive_" + normalize(spooler_task.order().id()) + ".")) {
if(sosString.parseToString(spooler.var(paramNames[i])).equals("0")) {
// Anzahl der Wiederholung merken
String sRetry = sosString.parseToString(spooler.variables().var("cur_transfer_retry" + normalize(spooler_task.order().id())));
int retry = sRetry.length() == 0 ? 0 : Integer.parseInt(sRetry);
--retry;
spooler.variables().set_var("cur_transfer_retry" + normalize(spooler_task.order().id()), String.valueOf(retry));
if(retry == 0) {
getLogger().debug("terminated cause max order setback reached: " + paramNames[i]);
spooler.variables().set_var("terminated_cause_max_order_setback_" + normalize(spooler_task.order().id()), "1");
return false;
}
getLogger().debug("launch setback: " + parallelTransferCheckRetry + " * " + parallelTransferCheckSetback);
spooler_task.order().setback();
return false;
} else if(sosString.parseToString(spooler.var(paramNames[i])).equals("1")) {
getLogger().debug("successfully terminated: " + paramNames[i]);
} else if(sosString.parseToString(spooler.var(paramNames[i])).equals("2")) {
bSuccess = false;
getLogger().debug("terminated with error : " + paramNames[i]);
}
}
}
return bSuccess;
} else if(params.var("ftp_parent_order_id") != null) {
// Hauptauftrag wurde wegen Erreichens von ftp_parallel_check_retry beendet -> die restlichen Unterauftr�ge sollen nicht durchlaufen
String state = spooler.variables().var("terminated_cause_max_order_setback_" + normalize(params.var("ftp_parent_order_id")));
if(state.equals("1"))
return false;
}
if (host == null || host.length() == 0) throw new Exception("no host was specified");
if (user == null || user.length() == 0) throw new Exception("no user was specified");
if(sosString.parseToString(filePath).length() > 0) {
if(pollMinFiles > 1) {
getLogger().warn("parameter [ftp_poll_minfiles] should not be greater than 1 because Parameter ftp_file_path has been specified. ");
}
}
} catch (Exception e) {
rc = false;
throw (new Exception("invalid or insufficient parameters: " + e));
}
try { // to process ftp
if(localDir.startsWith("\\\\")) {
// replaceAll has bugs
while(localDir.indexOf("\\") != -1) {
localDir = localDir.replace('\\', '/');
}
}
if (localDir.startsWith("file://")) {
if(!new File(createURI(localDir)).exists()) {
throw new Exception("local directory does not exist or is not accessible: " + localDir);
}
}
this.getLogger().info("connecting to host " + host + ", port " + port + ", local directory " + localDir + ", remote directory " + remoteDir +
(isFilePath ? ", file " + filePath : ", file specification " + fileSpec) );
boolean alternativeUse = true; // use alternative settings when first FTP server is not available
int isAlternativeParameterUse = 0;
while (alternativeUse && isAlternativeParameterUse <= 1) {
try {
if (protocol.equalsIgnoreCase("ftp")){
SOSFTP sosftp = new SOSFTP(host, port);
ftpClient = sosftp;
this.getLogger().debug("..ftp server reply [init] [host=" + host + "], [port="+ port + "]: " + ftpClient.getReplyString() );
if (account != null && account.length() > 0) {
isLoggedIn = sosftp.login(user, password, account);
this.getLogger().debug("..ftp server reply [login] [user=" + user + "], [account=" + account + "]: " + ftpClient.getReplyString() );
} else {
isLoggedIn = sosftp.login(user, password);
this.getLogger().debug("..ftp server reply [login] [user=" + user + "]: " + ftpClient.getReplyString());
}
if (!isLoggedIn || sosftp.getReplyCode() > ERROR_CODE) {
throw new Exception("..ftp server reply [login failed] [user=" + user + "], [account=" + account + "]: " + ftpClient.getReplyString() );
}
} else if (protocol.equalsIgnoreCase("sftp")){
SOSSFTP sftpClient = new SOSSFTP(host,port);
ftpClient = sftpClient;
sftpClient.setAuthenticationFilename(authenticationFilename);
sftpClient.setAuthenticationMethod(authenticationMethod);
sftpClient.setPassword(password);
sftpClient.setProxyHost(proxyHost);
sftpClient.setProxyPort(proxyPort);
sftpClient.setProxyPassword(proxyPassword);
sftpClient.setProxyUser(proxyUser);
sftpClient.setUser(user);
sftpClient.connect();
try{
this.getLogger().debug("..sftp server logged in [user=" + user + "], [host=" + host + "]" );
}catch (Exception e){
throw new Exception("..sftp server login failed [user=" + user + "], [host=" + host + "]: " + e );
}
} else if (protocol.equalsIgnoreCase("ftps")){
try{
if ( proxyHost != null && proxyPort != 0) {
System.getProperties().setProperty("proxyHost", proxyHost);
System.getProperties().setProperty("proxyPort", String.valueOf(proxyPort) );
System.getProperties().setProperty("proxySet", "true");
}
SOSFTPS sosftps = new SOSFTPS(host, port);
ftpClient = sosftps;
this.getLogger().debug("..ftp server reply [init] [host=" + host + "], [port="+ port + "]: " + ftpClient.getReplyString() );
isLoggedIn = sosftps.login(user, password);
this.getLogger().debug("..ftp server reply [login] [user=" + user + "]: " + ftpClient.getReplyString());
if (!isLoggedIn || sosftps.getReplyCode() > ERROR_CODE) {
throw new Exception("..ftp server reply [login failed] [user=" + user + "], [account=" + account + "]: " + ftpClient.getReplyString() );
}
}catch (Exception e){
throw new Exception("..ftps server login failed [user=" + user + "], [host=" + host + "]: " + e );
}
} else{
throw new Exception("Unknown protocol: "+protocol);
}
alternativeUse = false;
} catch (Exception ex) {
this.getLogger().info("..error in ftp server init with [host=" + host + "], [port="+ port + "] " + getErrorMessage(ex));
alternativeUse = (alternativeHost.concat(alternativeUser).concat(alternativePassword).
concat(alternativeAccount).length() > 0 || alternativePort != 0);
if(alternativeUse && isAlternativeParameterUse == 0) {
if (ftpClient != null) {
if (isLoggedIn) try { ftpClient.logout(); } catch (Exception e) {} // no error handling
if (ftpClient.isConnected()) try { ftpClient.disconnect(); } catch(Exception e) {} // no error handling
}
isAlternativeParameterUse++;
host = getAlternative(host, alternativeHost);
port = getAlternative(port, alternativePort);
user = getAlternative(user, alternativeUser);
password = getAlternative(password, alternativePassword);
account = getAlternative(account, alternativeAccount);
remoteDir = getAlternative(remoteDir, alternativeRemoteDir);
passiveMode = alternativePassiveMode;
transferMode = getAlternative(transferMode, alternativeTransferMode);
this.getLogger().info("..try login with alternative parameters [host=" + host + "], [port="+ port + "] " +
"[user=" + user + "], [account=" + account + "], [remoteDir=" + remoteDir +"], [passiveMode=" + passiveMode +"], " +
"[transferMode=" + transferMode + "]");
} else {
throw new Exception ("..error in ftp server init with [host=" + host + "], [port="+ port + "] " + ex,ex);
}
}
}
if (ftpClient instanceof SOSFTP){
SOSFTP sosftp = (SOSFTP) ftpClient;
if (passiveMode) {
sosftp.passive();
if (sosftp.getReplyCode() > ERROR_CODE) {
throw new Exception("..ftp server reply [passive]: " + ftpClient.getReplyString());
} else {
this.getLogger().debug("..ftp server reply [passive]: " + ftpClient.getReplyString());
}
}
if (transferMode.equalsIgnoreCase("ascii")) {
if (sosftp.ascii()) {
this.getLogger().debug("..using ASCII mode for file transfer");
this.getLogger().debug("..ftp server reply [ascii]: " + ftpClient.getReplyString());
} else {
throw new Exception(".. could not switch to ASCII mode for file transfer ..ftp server reply [ascii]: " + ftpClient.getReplyString());
}
} else {
if (sosftp.binary()) {
this.getLogger().debug("using binary mode for file transfers.");
this.getLogger().debug("..ftp server reply [binary]: " + ftpClient.getReplyString());
} else {
throw new Exception(".. could not switch to binary mode for file transfer ..ftp server reply [ascii]: " + ftpClient.getReplyString());
}
}
}
if(isFilePath) {
String currRemoteDir = new File(filePath).getParent().replaceAll("\\\\", "/");
if(sosString.parseToString(currRemoteDir).length() > 0) {
if (!ftpClient.changeWorkingDirectory(currRemoteDir)) {
throw new Exception("..ftp server reply [cd] [directory ftp_file_path=" + currRemoteDir + "]: " + ftpClient.getReplyString());
} else {
getLogger().debug("..ftp server reply [cd] [directory ftp_file_path=" + currRemoteDir +"]: " + ftpClient.getReplyString() );
}
}
} else if (sosString.parseToString(remoteDir).length() > 0) {
if (!ftpClient.changeWorkingDirectory(remoteDir)) {
throw new Exception("..ftp server reply [cd] [remoteDir=" + remoteDir + "]: " + ftpClient.getReplyString());
} else {
getLogger().debug("..ftp server reply [cd] [remoteDir=" + remoteDir + "]: " + ftpClient.getReplyString() );
}
}
Vector filelist = null;
if (isFilePath) {
filelist = new Vector();
filelist.add(filePath);
fileSpec = ".*";
} else {
filelist = ftpClient.nList(recursive);
}
int count = 0;
int zeroByteCount = 0;
/*
if (ftpClient.getReplyCode() > ERROR_CODE) {
// no error checking for empty directories
// throw new Exception("..error occurred looking up remote files [" + remoteDir + "]: " + ftpClient.getReplyString());
} else {
getLogger().debug("..ftp server reply [nlist]: " + ftpClient.getReplyString() );
}*/
getLogger().debug("..ftp server reply [nlist]: " + ftpClient.getReplyString() );
Pattern pattern = Pattern.compile(fileSpec, 0);
Iterator iterator = filelist.iterator();
// parallel Transfer
if(parallelTransfer && !isFilePath) {
if(spooler_job.order_queue() == null) {
// parallel Transfer for standalone Job
while (iterator.hasNext()) {
String fileName = (String)iterator.next();
Variable_set newParams = params;
newParams.set_var("ftp_file_path", (remoteDir.endsWith("/") || remoteDir.endsWith("\\") ? remoteDir : remoteDir + "/") + fileName);
getLogger().info("launching job for parallel transfer with parameter: ftp_file_path " + (remoteDir.endsWith("/") || remoteDir.endsWith("\\") ? remoteDir : remoteDir + "/") + fileName);
spooler.job(spooler_task.job().name()).start(params);
}
return false;
} else {
// parallel Transfer for order job
while (iterator.hasNext()) {
String fileName = (String)iterator.next();
Variable_set newParams = spooler.create_variable_set();
if (spooler_task.params() != null) newParams.merge(params);
newParams.set_var("ftp_file_path", (remoteDir.endsWith("/") || remoteDir.endsWith("\\") ? remoteDir : remoteDir + "/") + fileName);
newParams.set_var("ftp_parent_order_id", spooler_task.order().id());
newParams.set_var("ftp_order_self_destruct", "1");
Order newOrder = spooler.create_order();
newOrder.set_state(spooler_task.order().state());
newOrder.set_params(newParams);
spooler.job_chain(spooler_task.order().job_chain().name()).add_order(newOrder);
getLogger().info("launching order for parallel transfer with parameter: ftp_file_path " + (remoteDir.endsWith("/") || remoteDir.endsWith("\\") ? remoteDir : remoteDir + "/") + fileName);
spooler.variables().set_var("ftp_order", normalize(spooler_task.order().id()) + "." + normalize(newOrder.id()) + "." + "0");
spooler.variables().set_var("ftp_check_receive_" + normalize(spooler_task.order().id()) + "." + normalize(newOrder.id()), "0");
}
// am aktuellen Auftrag speichern, dass im Wiederholungsfall per setback() nicht erneut Auftr�ge erzeugt werden sollen, sondern dass deren Erledigungszustand gepr�ft wird:
spooler_task.order().params().set_var("ftp_check_parallel", "yes");
spooler_job.set_delay_order_after_setback(1, parallelTransferCheckSetback);
spooler_job.set_max_order_setbacks(parallelTransferCheckRetry);
spooler_task.order().setback();
spooler.variables().set_var("cur_transfer_retry" + normalize(spooler_task.order().id()), String.valueOf(parallelTransferCheckRetry));
return false;
}
}
// end Parallel Transfer
if (pollTimeout>0){
// before any processing, check if files are available
boolean done= false;
boolean giveUpPoll = false;
double delay = pollIntervall;
double nrOfTries = (pollTimeout*60)/delay;
int tries=0;
while(!done && !giveUpPoll){
tries++;
int matchedFiles=0;
while(iterator.hasNext()) {
String fileName = (String)iterator.next();
if (isFilePath) {
boolean found=false;
try{
// we are already in the directory, so use only name:
File file = new File(fileName);
long si = ftpClient.size(file.getName());
if (si>-1) found = true;
} catch(Exception e){
getLogger().debug9("File "+fileName+" was not found.");
}
if (found){
matchedFiles++;
getLogger().debug8("Found matching file "+fileName);
}
} else {
Matcher matcher = pattern.matcher(fileName);
if (matcher.find()) {
matchedFiles++;
getLogger().debug8("Found matching file "+fileName);
}
}
}
getLogger().debug3(matchedFiles +" matching files found");
if (matchedFiles < pollMinFiles){
if (spooler_job.order_queue() != null && spooler_task.order()!=null){
getLogger().debug3(pollMinFiles +" files required, setting back order.");
spooler_job.set_delay_order_after_setback(1,delay);
spooler_job.set_max_order_setbacks((int) nrOfTries);
spooler_task.order().setback();
done=true;
Variable_set orderParams = spooler_task.order().params();
getLogger().debug9("setback_count is now: " + iSetbackCount + " , maximum number of trials: "+nrOfTries);
if (iSetbackCount>nrOfTries){
orderParams.set_var("setback_count","");
giveUpPoll=true;
} else return true;
} else{ // simple job
if (tries<nrOfTries){
Thread.sleep((long)delay * 1000);
spooler_job.set_state_text("Polling for files... ");
if (isFilePath) {
filelist = new Vector();
filelist.add(filePath);
} else {
filelist = ftpClient.nList(recursive);
}
iterator = filelist.iterator();
} else {
giveUpPoll = true;
}
}
} else{
done=true;
spooler_job.set_state_text("");
}
if (giveUpPoll){
// keep configuration order monitor from repeating:
spooler_task.order().params().set_var("setback", "false");
String message = "Failed to find at least "+pollMinFiles+" files matching \""+
fileSpec +"\" ";
if (isFilePath) message = "Failed to find file \""+filePath+"\" ";
message +="after triggering for "+pollTimeout+" minutes.";
if (matchedFiles>0) message+=" (only "+matchedFiles+" files were found)";
if (pollFilesErrorState!=null && pollFilesErrorState.length()>0){
spooler_task.order().set_state(pollFilesErrorState);
spooler_task.order().params().set_var("ftp_result_error_message", message);
}
if (forceFiles) {
spooler_log.warn(message);
String body = message +"\n";
body += "See attached logfile for details.";
spooler_log.mail().set_body(body);
spooler_task.end();
}else {
spooler_log.info(message);
if (spooler_job.order_queue()!=null) return true;
}
return false;
}
}
}
iterator = filelist.iterator();
while(iterator.hasNext()) {
String fileName = (String)iterator.next();
Matcher matcher = pattern.matcher(fileName);
File transferFile = null;
spooler_log.debug7("Processing file "+fileName);
if(recursive && !isFilePath) {
if (sosString.parseToString(remoteDir).length() > 0) {
if (!ftpClient.changeWorkingDirectory(remoteDir)) {
throw new Exception("..ftp server reply [cd] [remoteDir=" + remoteDir + "]: " + ftpClient.getReplyString());
} else {
getLogger().debug("..ftp server reply [cd] [remoteDir=" + remoteDir + "]: " + ftpClient.getReplyString() );
}
}
}
if (matcher.find() || isFilePath) {
if (replacement != null && replacement.length() > 0 && replacing != null && replacing.length() > 0) {
String currTransferFilename = SOSFileOperations.getReplacementFilename(fileName, replacing, replacement);
this.getLogger().info(" transfer file [" + fileName + "] is renamed to " + currTransferFilename);
if(isFilePath) {
transferFile = this.createFile(localDir
+ (localDir.endsWith("/") || localDir.endsWith("\\") ? "" : "/")
+ new File(currTransferFilename).getName());
} else {
transferFile = this.createFile(localDir
+ (localDir.endsWith("/") || localDir.endsWith("\\") ? "" : "/")
+ currTransferFilename);
}
} else {
if(isFilePath) {
transferFile = this.createFile(localDir
+ (localDir.endsWith("/") || localDir.endsWith("\\") ? "" : "/")
+ new File(fileName).getName());
} else {
transferFile = this.createFile(localDir
+ (localDir.endsWith("/") || localDir.endsWith("\\") ? "" : "/")
+ fileName);
}
}
File transFile = transferFile;
if(transFile.getParent() != null && !transFile.getParentFile().exists()){
if(transFile.getParentFile().mkdirs()) {
this.getLogger().info("..create parent directory [" + transFile.getParentFile() + "]");
} else {
throw new Exception ("..error occurred creating directory [" + transFile.getParentFile() + "]");
}
}
if (!appendFiles && !overwriteFiles && transferFile.exists()) {
this.getLogger().info("..ftp transfer skipped for file [no overwrite]: " + transferFile.getName());
continue;
}
long bytesSent = 0;
if (!appendFiles && atomicSuffix != null && atomicSuffix.length() > 0) {
File atomicFile = new File(transferFile.getAbsolutePath() + atomicSuffix);
File file = new File(fileName);
if(recursive) {
if(file.getParent() != null && !isFilePath) {
String[] splitParent = file.getParent().split("\\\\");
for (int i=0; i<splitParent.length; i++) {
if(sosString.parseToString(splitParent[i]).length() > 0) {
if (!ftpClient.changeWorkingDirectory(splitParent[i])) {
throw new Exception("..ftp server reply [cd] [remoteDir=" + splitParent[i] + "]: " + ftpClient.getReplyString());
} else {
getLogger().debug("..ftp server reply [cd] [remoteDir=" + splitParent[i] + "]: " + ftpClient.getReplyString() );
}
}
}
}
}
if(skipTransfer){
bytesSent = ftpClient.size(file.getName());
fileNotificationBody += transferFile.getName() + "\n";
transferFileList.add(transferFile);
count++;
}else{
bytesSent = this.transferFile(ftpClient, new File(file.getName()), atomicFile, checkRetry, checkInterval, checkSize, appendFiles);
if (transferFile.exists()) {
if (overwriteFiles) {
if (!transferFile.delete()) {
throw new Exception("could not overwrite local file: " + transferFile.getAbsolutePath());
}
}
}
if (bytesSent <= 0 && !zeroByteFiles && zeroByteFilesStrict) {
this.getLogger().info("removing local file : " + transferFile.getAbsolutePath() + " due to zero byte strict constraint");
if (!atomicFile.delete()) {
throw new Exception("..error occurred, could not remove temporary file: " + atomicFile.getAbsolutePath());
}
fileZeroByteNotificationBody += transferFile.getName() + "\n";
zeroByteCount++;
} else if (bytesSent <= 0 && !zeroByteFiles) {
checkFileList.put(atomicFile, transferFile);
} else {
this.getLogger().info("renaming local temporary file : " + transferFile.getAbsolutePath());
if (!atomicFile.renameTo(transferFile)) {
throw new Exception("could not rename temporary file [" + atomicFile.getCanonicalPath() + "] to: " + transferFile.getAbsolutePath());
}
fileNotificationBody += transferFile.getName() + "\n";
transferFileList.add(transferFile);
count++;
}
}
} else {
File file = new File(fileName);
if(recursive) {
if(file.getParent() != null) {
String[] splitParent = file.getParent().split("\\\\");
for (int i=0; i<splitParent.length; i++) {
if (!ftpClient.changeWorkingDirectory(splitParent[i])) {
throw new Exception("..ftp server reply [cd] [remoteDir=" + splitParent[i] + "]: " + ftpClient.getReplyString());
} else {
getLogger().debug("..ftp server reply [cd] [remoteDir=" + splitParent[i] + "]: " + ftpClient.getReplyString() );
}
}
}
}
if(skipTransfer){
bytesSent = ftpClient.size(file.getName());
fileNotificationBody += transferFile.getName() + "\n";
transferFileList.add(transferFile);
count++;
}else{
bytesSent = this.transferFile(ftpClient, new File(file.getName()), transferFile, checkRetry, checkInterval, checkSize, appendFiles);
if (bytesSent <= 0 && !zeroByteFiles && zeroByteFilesStrict) {
this.getLogger().info("removing local file : " + transferFile.getAbsolutePath() + " due to zero byte strict constraint");
if (!transferFile.delete()) {
throw new Exception("..error occurred, could not remove temporary file: " + transferFile.getAbsolutePath());
}
fileZeroByteNotificationBody += transferFile.getName() + "\n";
zeroByteCount++;
} else if (bytesSent <= 0 && !zeroByteFiles) {
checkFileList.put(transferFile, transferFile);
} else {
fileNotificationBody += transferFile.getName() + "\n";
transferFileList.add(transferFile);
count++;
}
}
}
if (removeFiles) {
boolean ok = ftpClient.delete(new File(fileName).getName());
// rueckgabewert genuegt
/* if (ftpClient.getReplyCode() > ERROR_CODE) {
throw new Exception("..error occurred removing remote file [" + transferFile.getName() + "]: " + ftpClient.getReplyString());
} else */
if (!ok) {
throw new Exception("..error occurred, could not remove remote file [" + transferFile.getName() + "]: " + ftpClient.getReplyString());
} else {
this.getLogger().info("removing remote file: " + transferFile.getName());
}
}
}
}
if (!zeroByteFiles) {
Iterator checkFileListIterator = checkFileList.keySet().iterator();
while(checkFileListIterator.hasNext()) {
File checkFile = (File) checkFileListIterator.next();
File transferFile = (File) checkFileList.get(checkFile);
if (checkFile == null || transferFile == null) throw new Exception("..error occurred, empty file list is corrupted");
// if no files with more than zero byte have been transferred then all zero byte files will be removed
if (transferFileList.isEmpty() || zeroByteFilesRelaxed) {
this.getLogger().info("removing local temporary file : " + checkFile.getAbsolutePath() + " due to zero byte constraint");
if (!checkFile.delete()) {
throw new Exception("could not remove temporary file: " + checkFile.getAbsolutePath());
}
zeroByteCount++;
fileZeroByteNotificationBody += transferFile.getName() + "\n";
// otherwise rename files with atomic suffixes to their target names
} else {
if (atomicSuffix != null && atomicSuffix.length() > 0 && checkFile.getAbsolutePath().endsWith(atomicSuffix)) {
this.getLogger().info("renaming local temporary file: " + transferFile.getAbsolutePath());
if (!checkFile.renameTo(transferFile)) {
throw new Exception("could not rename temporary file [" + checkFile.getCanonicalPath() + "] to: " + transferFile.getAbsolutePath());
}
}
count++;
fileNotificationBody += transferFile.getName() + "\n";
}
}
}
if (zeroByteCount > 0 && fileZeroByteNotificationTo != null && fileZeroByteNotificationTo.length() > 0) {
sendMail(fileZeroByteNotificationTo, fileZeroByteNotificationCC, fileZeroByteNotificationBCC, fileZeroByteNotificationSubject, fileZeroByteNotificationBody);
}
if (count > 0 && fileNotificationTo != null && fileNotificationTo.length() > 0) {
sendMail(fileNotificationTo, fileNotificationCC, fileNotificationBCC, fileNotificationSubject, fileNotificationBody);
}
String received = "received";
if (skipTransfer) received = "found";
switch (count) {
case 0: if (zeroByteCount > 0 && zeroByteFilesRelaxed) {
spooler_job.set_state_text("no matching files found, " + zeroByteCount + " zero byte files skipped");
this.getLogger().info("no matching files found, " + zeroByteCount + " zero byte files skipped");
} else if (zeroByteCount > 0 && zeroByteFilesStrict) {
throw new Exception("zero byte file(s) found");
} else if (forceFiles) {
throw new Exception("no matching files found");
} else {
spooler_job.set_state_text("no matching files found");
this.getLogger().info("no matching files found");
}
if (!forceFiles) {
return false;
} else {
rc = (!forceFiles ? true : !zeroByteFilesRelaxed);
}
break;
case 1: this.getLogger().info("1 file "+received + ((zeroByteCount > 0) ? ", " + zeroByteCount + " files skipped due to zero byte constraint" : "") );
spooler_job.set_state_text("1 file "+received + ((zeroByteCount > 0) ? ", " + zeroByteCount + " files skipped due to zero byte constraint" : "") );
rc = true;
break;
default: this.getLogger().info(count + " files "+received + ((zeroByteCount > 0) ? ", " + zeroByteCount + " files skipped due to zero byte constraint" : "") );
spooler_job.set_state_text(count + " files "+received + ((zeroByteCount > 0) ? ", " + zeroByteCount + " files skipped due to zero byte constraint" : "") );
rc = true;
break;
}
String fileNames="";
String filePaths="";
Iterator transferredIterator = transferFileList.iterator();
while (transferredIterator.hasNext()){
File curFile = (File) transferredIterator.next();
filePaths += curFile.getAbsolutePath();
fileNames += curFile.getName();
if (transferredIterator.hasNext()) {
filePaths+=";";
fileNames+=";";
}
}
// return the number of transferred files and filenames
if (spooler_job.order_queue() != null) {
if (spooler_task.order() != null && spooler_task.order().params() != null) {
spooler_task.order().params().set_var("ftp_result_files", Integer.toString(count));
spooler_task.order().params().set_var("ftp_result_zero_byte_files", Integer.toString(zeroByteCount ));
spooler_task.order().params().set_var("ftp_result_filenames", fileNames);
spooler_task.order().params().set_var("ftp_result_filepaths", filePaths);
spooler_task.order().params().set_var("setback_count","");
}
} else {
if (spooler_task.params() != null){
spooler_task.params().set_var("ftp_result_files", Integer.toString(count));
spooler_task.params().set_var("ftp_result_zero_byte_files", Integer.toString(zeroByteCount));
spooler_task.params().set_var("ftp_result_filenames", fileNames);
spooler_task.params().set_var("ftp_result_filepaths", filePaths);
}
}
if(parallelTransfer && isFilePath && spooler_job.order_queue() != null) {
spooler.variables().set_var("ftp_check_receive_" + normalize(params.var("ftp_parent_order_id")) + "." + normalize(spooler_task.order().id()), "1");
}
processResult(rc, "");
return (spooler_task.job().order_queue() == null) ? false : rc;
} catch (Exception e) {
rc = false;
if(parallelTransfer && isFilePath && spooler_job.order_queue() != null) {
spooler.variables().set_var("ftp_check_receive_" + normalize(normalize(params.var("ftp_parent_order_id"))) + "." + normalize(spooler_task.order().id()), "2");
}
spooler_job.set_state_text("could not process file transfer: " + e);
throw (new Exception("could not process file transfer: " + e,e));
} finally {
if(parallelTransfer) {
if(orderSelfDestruct) {
// positiven Endzustand f�r den parallel gestarteten Auftrag finden
String state = "";
sos.spooler.Job_chain_node node = spooler_task.order().job_chain_node();
while (node != null) {
node = node.next_node();
if(node != null)
state = node.state();
}
// Endzustand
spooler_task.order().set_state(state);
}
}
if (ftpClient != null) {
if (isLoggedIn) try { ftpClient.logout(); } catch (Exception e) {} // no error handling
if (ftpClient.isConnected()) try { ftpClient.disconnect(); } catch(Exception e) {} // no error handling
}
}
} catch (Exception e){
processResult(false, e.toString());
spooler_log.warn ("ftp processing failed: " + e.toString());
if (spooler_job.order_queue() != null) {
if (spooler_task.order() != null && spooler_task.order().params() != null) {
spooler_task.order().params().set_var("setback_count","");
}
}
return false;
}
}
private long transferFile(SOSFileTransfer ftpClient, File sourceFile, File targetFile, long checkRetry, long checkInterval, boolean checkSize,
boolean appendFiles) throws Exception {
long retry = (checkRetry > 0) ? checkRetry : 0;
long interval = (checkInterval > 0) ? checkInterval : 60;
long currentBytesSent = 0;
long previousBytesSent = 0;
try {
this.getLogger().info("receiving file: " + targetFile.getAbsolutePath() + " " + ftpClient.size(sourceFile.getName()) + " bytes");
for(int i=-1; i<retry; i++) {
currentBytesSent = ftpClient.getFile( sourceFile.getName(), targetFile.getAbsolutePath(), appendFiles );
if (currentBytesSent < 0) currentBytesSent = 0;
// error handling mit exception
/*
if (ftpClient.getReplyCode() > ERROR_CODE) {
throw new Exception("..error occurred receiving file [" + targetFile.getAbsolutePath() + "]: " + ftpClient.getReplyString());
} else {
this.getLogger().debug("..ftp server reply [getFile] [" + targetFile.getAbsolutePath() + ", size=" + currentBytesSent + "]: " + ftpClient.getReplyString());
}*/
this.getLogger().debug("..ftp server reply [getFile] [" + targetFile.getAbsolutePath() + ", size=" + currentBytesSent + "]: " + ftpClient.getReplyString());
if (appendFiles == false && retry > 0 && currentBytesSent != previousBytesSent) {
this.getLogger().info("..retry " + (i+2) + " of " + retry + " to wait " + interval + "s for file transfer being completed, current file size: " + currentBytesSent + " bytes");
try {
Thread.sleep(checkInterval*1000);
} catch (InterruptedException e){} // the VM doesn't want us to sleep anymore, so get back to work
} else {
break;
}
previousBytesSent = currentBytesSent;
}
if (checkSize && targetFile.length() > 0 && ftpClient.size(sourceFile.getName()) != currentBytesSent) {
throw new Exception("..error occurred receiving file, source file size [" + ftpClient.size(sourceFile.getName()) + "] does not match number of bytes transferred [" + currentBytesSent + "], target file size is " + targetFile.length());
}
return currentBytesSent;
} catch (Exception e){
throw new Exception("file transfer failed: " + e.getMessage());
}
}
/**
* send mail with Job Scheduler settings
*
* @param recipient
* @param recipientCC carbon copy recipient
* @param recipientBCC blind carbon copy recipient
* @param subject
* @param body
* @throws Exception
*/
public void sendMail(String recipient, String recipientCC, String recipientBCC, String subject, String body) throws Exception {
try {
SOSMail sosMail = new SOSMail(spooler_log.mail().smtp());
sosMail.setQueueDir(spooler_log.mail().queue_dir());
sosMail.setFrom(spooler_log.mail().from());
sosMail.setContentType("text/plain");
sosMail.setEncoding("Base64");
String recipients[] = recipient.split(",");
for(int i=0; i<recipients.length; i++) {
sosMail.addRecipient(recipients[i].trim());
}
String recipientsCC[] = recipientCC.split(",");
for(int i=0; i<recipientsCC.length; i++) {
sosMail.addCC(recipientsCC[i].trim());
}
String recipientsBCC[] = recipientBCC.split(",");
for(int i=0; i<recipientsBCC.length; i++) {
sosMail.addBCC(recipientsBCC[i].trim());
}
sosMail.setSubject(subject);
sosMail.setBody(body);
sosMail.setSOSLogger(this.getLogger());
this.getLogger().info("sending mail: \n" + sosMail.dumpMessageAsString());
if (!sosMail.send()){
this.getLogger().warn("mail server is unavailable, mail for recipient [" + recipient + "] is queued in local directory [" + sosMail.getQueueDir() + "]:" + sosMail.getLastError());
}
sosMail.clearRecipients();
} catch (Exception e) {
throw new Exception("error occurred sending mai: " + e.getMessage());
}
}
private String getAlternative(String param, String alternativeParam) {
try {
if(sosString.parseToString(alternativeParam).length() > 0) {
return alternativeParam;
} else {
return param;
}
} catch (Exception e) {
spooler_log.warn("error in getAlternative(): " + e.getMessage());
return param;
}
}
private int getAlternative(int param, int alternativeParam) {
try {
if(alternativeParam > 0) {
return alternativeParam;
} else {
return param;
}
} catch (Exception e) {
spooler_log.warn("error in getAlternative(): " + e.getMessage());
return param;
}
}
private String normalize(String str) {
return str.replaceAll(",", "_");
}
public static String getErrorMessage(Exception ex) throws Exception {
String s = "";
try {
Throwable tr = ex.getCause();
if(ex.toString() != null)
s = ex.toString();
while (tr != null){
if(s.indexOf(tr.toString()) == -1)
s = (s.length() > 0 ? s + ", " : "") + tr.toString();
tr = tr.getCause();
}
} catch (Exception e) {
throw ex;
}
return s;
}
protected void processResult(boolean rc, String message) {
// do nothing, entry point for subclasses
}
}
|