added delete from remote if not present on source, fixed a bug on comparison of already uploaded files, changed md5 pathdir of large files, added print messages, fixed remote list of encrypted files if there are multiple encrypted versions of the same file
This commit is contained in:
@@ -7,7 +7,7 @@ from utility import read_in_chunks
|
|||||||
import time
|
import time
|
||||||
import shutil
|
import shutil
|
||||||
|
|
||||||
def launch(localpath,temp_dir,swift_container,prefix,size_limit_to_segment,size_limit_reading_os,upload,uploadlarge,fail_tries ,md5_compare, encrypted,encrypt_key,excluded_patterns,copy_to_dir):
|
def launch(localpath,temp_dir,swift_container,prefix,size_limit_to_segment,size_limit_reading_os,upload,uploadlarge,fail_tries ,md5_compare, encrypted,encrypt_key,excluded_patterns,copy_to_dir,delete):
|
||||||
print ("Localpath " + localpath)
|
print ("Localpath " + localpath)
|
||||||
print ("Temppath " + temp_dir)
|
print ("Temppath " + temp_dir)
|
||||||
print ("Swift container " + swift_container)
|
print ("Swift container " + swift_container)
|
||||||
@@ -44,9 +44,9 @@ def launch(localpath,temp_dir,swift_container,prefix,size_limit_to_segment,size_
|
|||||||
print("___________")
|
print("___________")
|
||||||
swift_conn = authentication.set_authentication ()
|
swift_conn = authentication.set_authentication ()
|
||||||
swift_conn,objects = utility.get_list(fail_tries,swift_conn,swift_container,prefix)
|
swift_conn,objects = utility.get_list(fail_tries,swift_conn,swift_container,prefix)
|
||||||
byte0real,byte0manifest,swift_conn,remotefiles,remotefiles_md5 = utility.list_compute_correct_size (fail_tries,objects,swift_conn,swift_container,prefix)
|
byte0real,byte0manifest,swift_conn,remotefiles,remotefiles_md5,remotefiles_xobj = utility.list_compute_correct_size (fail_tries,objects,swift_conn,swift_container,prefix)
|
||||||
if encrypted:
|
if encrypted:
|
||||||
remotefiles_encr = utility.list_compute_correct_names_for_enctyption(objects,prefix)
|
remotefiles_encr,list_enc_old = utility.list_compute_correct_names_for_encryption(objects,prefix)
|
||||||
|
|
||||||
print ("Files remoti " + str(len(remotefiles)))
|
print ("Files remoti " + str(len(remotefiles)))
|
||||||
|
|
||||||
@@ -90,7 +90,7 @@ def launch(localpath,temp_dir,swift_container,prefix,size_limit_to_segment,size_
|
|||||||
if upload_file:
|
if upload_file:
|
||||||
if encrypted :
|
if encrypted :
|
||||||
lnameenc = lname + "_xg10v10_encrypted"
|
lnameenc = lname + "_xg10v10_encrypted"
|
||||||
xtime = str(int(time()))
|
xtime = str(int(time.time()))
|
||||||
if lnameenc not in remotefiles_encr.keys() or localfiles[lname] != int((remotefiles_encr[lnameenc]).split("_xg10v10_")[2]) or remotefiles[remotefiles_encr[lnameenc]] != utility.total_size_encrypted(localfiles[lname]) :
|
if lnameenc not in remotefiles_encr.keys() or localfiles[lname] != int((remotefiles_encr[lnameenc]).split("_xg10v10_")[2]) or remotefiles[remotefiles_encr[lnameenc]] != utility.total_size_encrypted(localfiles[lname]) :
|
||||||
if upload:
|
if upload:
|
||||||
with open(localpath + lname, 'rb') as f:
|
with open(localpath + lname, 'rb') as f:
|
||||||
@@ -223,7 +223,7 @@ def launch(localpath,temp_dir,swift_container,prefix,size_limit_to_segment,size_
|
|||||||
skipped_uploads = 0
|
skipped_uploads = 0
|
||||||
for file, size in difffiles.items():
|
for file, size in difffiles.items():
|
||||||
hash_dir = hashlib.md5()
|
hash_dir = hashlib.md5()
|
||||||
hash_dir.update((utility.folder_from_path(file,utility.set_dash())[:-1]).encode("utf-8"))
|
hash_dir.update((utility.dash_replace(prefix + file)).encode("utf-8"))
|
||||||
hash_dir = hash_dir.hexdigest()
|
hash_dir = hash_dir.hexdigest()
|
||||||
if encrypted:
|
if encrypted:
|
||||||
local_path_corrected =temp_dir
|
local_path_corrected =temp_dir
|
||||||
@@ -246,6 +246,7 @@ def launch(localpath,temp_dir,swift_container,prefix,size_limit_to_segment,size_
|
|||||||
for piece in read_in_chunks(f,size_limit_reading_os):
|
for piece in read_in_chunks(f,size_limit_reading_os):
|
||||||
hash.update(piece)
|
hash.update(piece)
|
||||||
if bytes_written == 0:
|
if bytes_written == 0:
|
||||||
|
print("Creating segment: " + str(counter))
|
||||||
t = open(temp_dir + utility.file_only_name(file,utility.set_dash()) + "_" + str(format_numbers_for_large_files(str(counter),len(str(math.ceil( (size/size_limit_to_segment) * 10 ))))),'wb')
|
t = open(temp_dir + utility.file_only_name(file,utility.set_dash()) + "_" + str(format_numbers_for_large_files(str(counter),len(str(math.ceil( (size/size_limit_to_segment) * 10 ))))),'wb')
|
||||||
if (bytes_written + len(piece) <= size_limit_to_segment):
|
if (bytes_written + len(piece) <= size_limit_to_segment):
|
||||||
t.write(piece)
|
t.write(piece)
|
||||||
@@ -259,12 +260,13 @@ def launch(localpath,temp_dir,swift_container,prefix,size_limit_to_segment,size_
|
|||||||
counter = counter + 1
|
counter = counter + 1
|
||||||
hash = hash.hexdigest()
|
hash = hash.hexdigest()
|
||||||
large_segments_created = True
|
large_segments_created = True
|
||||||
|
print("Large segments created")
|
||||||
# check if there are uploaded segments
|
# check if there are uploaded segments
|
||||||
if not large_segments_uploaded:
|
if not large_segments_uploaded:
|
||||||
headers,remote_segments_list = swift_conn.get_container(swift_container + "_segments", prefix =hash + "_xg10v10_" + hash_dir + "_xg10v10_" + str(size_limit_to_segment) + "/",full_listing=True )
|
headers,remote_segments_list = swift_conn.get_container(swift_container + "_segments", prefix =hash + "_xg10v10_" + hash_dir + "_xg10v10_" + str(size_limit_to_segment) + "/",full_listing=True )
|
||||||
remote_segments_dict = {}
|
remote_segments_dict = {}
|
||||||
for o in remote_segments_list :
|
for o in remote_segments_list :
|
||||||
remote_segments_dict[o["name"].replace(hash +"_xg10v10_" + str(size_limit_to_segment) + "/","")] = o["bytes"]
|
remote_segments_dict[o["name"].replace(hash + "_xg10v10_" + hash_dir + "_xg10v10_" + str(size_limit_to_segment) + "/","")] = o["bytes"]
|
||||||
for local_segment_name,local_segment_size in local_segments_dict.items() :
|
for local_segment_name,local_segment_size in local_segments_dict.items() :
|
||||||
if (local_segment_name) not in remote_segments_dict.keys() or local_segment_size != remote_segments_dict[local_segment_name]:
|
if (local_segment_name) not in remote_segments_dict.keys() or local_segment_size != remote_segments_dict[local_segment_name]:
|
||||||
local_segments_to_upload_dict[local_segment_name] = local_segment_size
|
local_segments_to_upload_dict[local_segment_name] = local_segment_size
|
||||||
@@ -313,4 +315,41 @@ def launch(localpath,temp_dir,swift_container,prefix,size_limit_to_segment,size_
|
|||||||
else:
|
else:
|
||||||
print("Upload Disabled")
|
print("Upload Disabled")
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
dellist = []
|
||||||
|
print("")
|
||||||
|
print("Computing deletion list...")
|
||||||
|
if encrypted:
|
||||||
|
#update remote list with new files encrypted
|
||||||
|
swift_conn,objects = utility.get_list(fail_tries,swift_conn,swift_container,prefix)
|
||||||
|
remotefiles_encr,list_enc_old = utility.list_compute_correct_names_for_encryption(objects,prefix)
|
||||||
|
|
||||||
|
for o in list_enc_old:
|
||||||
|
dellist.append(o)
|
||||||
|
for rname in remotefiles_encr.keys():
|
||||||
|
if rname.endswith("_xg10v10_encrypted"):
|
||||||
|
rname_pure = rname.split("_xg10v10_encrypted")[0]
|
||||||
|
if rname_pure in remotefiles_encr.keys():
|
||||||
|
dellist.append(remotefiles_encr[rname_pure])
|
||||||
|
if rname_pure not in localfiles.keys():
|
||||||
|
dellist.append(remotefiles_encr[rname])
|
||||||
|
else:
|
||||||
|
if rname not in localfiles.keys():
|
||||||
|
dellist.append(remotefiles_encr[rname])
|
||||||
|
else:
|
||||||
|
for rname in remotefiles.keys():
|
||||||
|
if rname not in localfiles.keys():
|
||||||
|
dellist.append(rname)
|
||||||
|
|
||||||
|
print("___________Files to delete______")
|
||||||
|
for files in dellist:
|
||||||
|
print(files)
|
||||||
|
print("___________")
|
||||||
|
if delete :
|
||||||
|
for object in dellist:
|
||||||
|
swift_conn = utility.delete_object(swift_conn,swift_container,prefix + object,remotefiles_xobj[object],fail_tries)
|
||||||
|
else:
|
||||||
|
print("Delete disabled")
|
||||||
|
|
||||||
swift_conn.close()
|
swift_conn.close()
|
||||||
|
|||||||
@@ -28,7 +28,8 @@ size_limit_reading_os = 134217728 #must be a power of 2 and smaller/equal than s
|
|||||||
# 4GB 4294967296
|
# 4GB 4294967296
|
||||||
upload = False
|
upload = False
|
||||||
enableLarge = True
|
enableLarge = True
|
||||||
fail_tries = 100
|
delete_remote_files_inexistent_on_source = True
|
||||||
|
fail_tries = 99999999999
|
||||||
temp_path = "\\\\?\\" + "c:\\temp\\"
|
temp_path = "\\\\?\\" + "c:\\temp\\"
|
||||||
excluded_patterns = ["Thumbs.db",".DS_Store","_gsdata_","__MACOSX", "desktop.ini","@eaDir"]
|
excluded_patterns = ["Thumbs.db",".DS_Store","_gsdata_","__MACOSX", "desktop.ini","@eaDir"]
|
||||||
batch = [
|
batch = [
|
||||||
@@ -41,5 +42,5 @@ batch = [
|
|||||||
|
|
||||||
###############################################################################################
|
###############################################################################################
|
||||||
for job in batch:
|
for job in batch:
|
||||||
#local folder,temp path, swift container, swift prefix, size to segment, size reading limit os, upload enabled?. upload large enabled? , fail tries, md5 comparison enabled?, encrypted?, encryption_key,additional_excluded_patterns,copy_to_dir
|
#local folder,temp path, swift container, swift prefix, size to segment, size reading limit os, upload enabled?. upload large enabled? , fail tries, md5 comparison enabled?, encrypted?, encryption_key,additional_excluded_patterns,copy_to_dir, delete_inexistent_remote_files
|
||||||
local_to_swift.launch(job[0],temp_path,job[1],job[2],size_limit_to_segment,size_limit_reading_os,upload,enableLarge,fail_tries, job[3],job[4],job[5], job[6] + excluded_patterns,job[7])
|
local_to_swift.launch(job[0],temp_path,job[1],job[2],size_limit_to_segment,size_limit_reading_os,upload,enableLarge,fail_tries, job[3],job[4],job[5], job[6] + excluded_patterns,job[7],delete_remote_files_inexistent_on_source)
|
||||||
@@ -34,8 +34,8 @@ def launch(localpath,swift_container,prefix,size_limit_reading_os,download,fail_
|
|||||||
print("___________")
|
print("___________")
|
||||||
swift_conn = authentication.set_authentication ()
|
swift_conn = authentication.set_authentication ()
|
||||||
swift_conn,objects = utility.get_list(fail_tries,swift_conn,swift_container,prefix)
|
swift_conn,objects = utility.get_list(fail_tries,swift_conn,swift_container,prefix)
|
||||||
byte0real,byte0manifest,swift_conn,remotefiles,remotefiles_md5 = utility.list_compute_correct_size (fail_tries,objects,swift_conn,swift_container,prefix)
|
byte0real,byte0manifest,swift_conn,remotefiles,remotefiles_md5,remotefiles_xobj = utility.list_compute_correct_size (fail_tries,objects,swift_conn,swift_container,prefix)
|
||||||
remotefiles_encr = utility.list_compute_correct_names_for_enctyption(objects,prefix)
|
remotefiles_encr,list_enc_old = utility.list_compute_correct_names_for_encryption(objects,prefix)
|
||||||
|
|
||||||
print ("Files remoti " + str(len(remotefiles)))
|
print ("Files remoti " + str(len(remotefiles)))
|
||||||
|
|
||||||
|
|||||||
56
utility.py
56
utility.py
@@ -59,6 +59,7 @@ def list_compute_correct_size (fail_tries,objects,swift_conn,swift_container,pre
|
|||||||
|
|
||||||
remotefiles = {}
|
remotefiles = {}
|
||||||
remotefiles_md5 = {}
|
remotefiles_md5 = {}
|
||||||
|
remotefiles_xobj = {}
|
||||||
byte0real = 0
|
byte0real = 0
|
||||||
byte0manifest = 0
|
byte0manifest = 0
|
||||||
|
|
||||||
@@ -91,20 +92,25 @@ def list_compute_correct_size (fail_tries,objects,swift_conn,swift_container,pre
|
|||||||
print("Impossible to get remote large file md5. Cause: Not uploaded with xgiovio method (md5 in x-object-manifest)")
|
print("Impossible to get remote large file md5. Cause: Not uploaded with xgiovio method (md5 in x-object-manifest)")
|
||||||
o["hash"] = "0"
|
o["hash"] = "0"
|
||||||
byte0manifest = byte0manifest + 1
|
byte0manifest = byte0manifest + 1
|
||||||
|
remotefiles_xobj[remote_dash_replace(o["name"].replace(prefix,""))] = oheaders["x-object-manifest"]
|
||||||
else:
|
else:
|
||||||
|
remotefiles_xobj[remote_dash_replace(o["name"].replace(prefix,""))] = None
|
||||||
print ("0byte file " + o["name"] + " e' un file normale" )
|
print ("0byte file " + o["name"] + " e' un file normale" )
|
||||||
byte0real = byte0real + 1
|
byte0real = byte0real + 1
|
||||||
|
else:
|
||||||
|
remotefiles_xobj[remote_dash_replace(o["name"].replace(prefix,""))] = None
|
||||||
remotefiles[remote_dash_replace(o["name"].replace(prefix,""))]=int(o["bytes"])
|
remotefiles[remote_dash_replace(o["name"].replace(prefix,""))]=int(o["bytes"])
|
||||||
remotefiles_md5[remote_dash_replace(o["name"].replace(prefix,""))]=o["hash"]
|
remotefiles_md5[remote_dash_replace(o["name"].replace(prefix,""))]=o["hash"]
|
||||||
|
|
||||||
|
|
||||||
return [byte0real,byte0manifest,swift_conn,remotefiles,remotefiles_md5]
|
return [byte0real,byte0manifest,swift_conn,remotefiles,remotefiles_md5,remotefiles_xobj]
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def list_compute_correct_names_for_enctyption (objects,prefix):
|
def list_compute_correct_names_for_encryption (objects,prefix):
|
||||||
|
|
||||||
remotefiles = {}
|
remotefiles = {}
|
||||||
|
list_enc_old = []
|
||||||
|
|
||||||
for o in objects :
|
for o in objects :
|
||||||
if o["content_type"] != "application/directory":
|
if o["content_type"] != "application/directory":
|
||||||
@@ -116,9 +122,11 @@ def list_compute_correct_names_for_enctyption (objects,prefix):
|
|||||||
if encrypted_name_only in remotefiles.keys():
|
if encrypted_name_only in remotefiles.keys():
|
||||||
if int(remotefiles[encrypted_name_only].split("_xg10v10_")[3]) < int(full.split("_xg10v10_")[3]) :
|
if int(remotefiles[encrypted_name_only].split("_xg10v10_")[3]) < int(full.split("_xg10v10_")[3]) :
|
||||||
remotefiles[encrypted_name_only] = full
|
remotefiles[encrypted_name_only] = full
|
||||||
|
else:
|
||||||
|
list_enc_old.append(full)
|
||||||
else:
|
else:
|
||||||
remotefiles[encrypted_name_only] = full
|
remotefiles[encrypted_name_only] = full
|
||||||
return remotefiles
|
return [remotefiles,list_enc_old]
|
||||||
|
|
||||||
|
|
||||||
def list_compute_manifest (fail_tries,objects,swift_conn,swift_container,prefix):
|
def list_compute_manifest (fail_tries,objects,swift_conn,swift_container,prefix):
|
||||||
@@ -223,3 +231,45 @@ def set_dash():
|
|||||||
if platform.system() == "Windows":
|
if platform.system() == "Windows":
|
||||||
return "\\"
|
return "\\"
|
||||||
return "/"
|
return "/"
|
||||||
|
|
||||||
|
|
||||||
|
def delete_object (swift_conn,swift_container,object,manifest,fail_tries):
|
||||||
|
for fail_tries_counter in range (fail_tries) :
|
||||||
|
try:
|
||||||
|
print("Deleting " + swift_container + "/" + object)
|
||||||
|
swift_conn.delete_object(swift_container,object)
|
||||||
|
if manifest != None:
|
||||||
|
segment_container = manifest.split("/")[0]
|
||||||
|
prefix = manifest.replace(segment_container + "/","")
|
||||||
|
manifest_downloaded = False
|
||||||
|
for fail_tries_counter1 in range (fail_tries) :
|
||||||
|
try:
|
||||||
|
if not manifest_downloaded:
|
||||||
|
print ("Downloading remote list for " + segment_container + " with prefix " +prefix + " ... ")
|
||||||
|
headers,objects = swift_conn.get_container(segment_container, prefix =prefix,full_listing=True )
|
||||||
|
manifest_downloaded = True
|
||||||
|
for o in objects :
|
||||||
|
print("Deleting " + segment_container + "/" + o["name"])
|
||||||
|
swift_conn.delete_object(segment_container,o["name"])
|
||||||
|
except Exception as e:
|
||||||
|
print("Exception during deletion of manifest files")
|
||||||
|
print(e)
|
||||||
|
time.sleep(1)
|
||||||
|
if fail_tries_counter1 == fail_tries - 1 :
|
||||||
|
print("Maximum tries reached. Can't delete " + swift_container + "/" + object +" manifest files.Skipping")
|
||||||
|
else:
|
||||||
|
swift_conn = authentication.set_authentication ()
|
||||||
|
else :
|
||||||
|
break
|
||||||
|
except Exception as e:
|
||||||
|
print("Exception during deletion of file")
|
||||||
|
print(e)
|
||||||
|
time.sleep(1)
|
||||||
|
if fail_tries_counter == fail_tries - 1 :
|
||||||
|
print("Maximum tries reached. Can't delete " + swift_container + "/" + object +".Skipping")
|
||||||
|
else:
|
||||||
|
swift_conn = authentication.set_authentication ()
|
||||||
|
else :
|
||||||
|
break
|
||||||
|
return swift_conn
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user