Python
by Juan Manuel González Garzón
Crash Course on Python
Ejemplos
#!/usr/bin/env python3
# ===============================================================================
# Ejemplos basicos
# ===============================================================================
def month_days(month, days):
"""Función donde transformo numero en texto para concatenarlo"""
print(month + " has " + str(days) + " days.")
month_days("June",30)
month_days("July",31)
def attempts(n):
"""Hacer varios intentos"""
x = 1
while x <= n:
print("Attempt " + str(x))
x += 1
print("Done")
attempts(5)
# ===============================================================================
# Ejemplos Declaración y uso de clases
# ===============================================================================
#Begin Portion 1#
import random
class Server:
def __init__(self):
"""Creates a new server instance, with no active connections."""
self.connections = {}
def add_connection(self, connection_id):
"""Adds a new connection to this server."""
connection_load = random.random()*10+1
# Add the connection to the dictionary with the calculated load
self.connections[connection_id] = connection_load
def close_connection(self, connection_id):
"""Closes a connection on this server."""
# Remove the connection from the dictionary
del self.connections[connection_id]
def load(self):
"""Calculates the current load for all connections."""
total = 0
# Add up the load for each of the connections
for load in self.connections.values():
total += load
return total
def __str__(self):
"""Returns a string with the current load of the server"""
return "{:.2f}%".format(self.load())
#End Portion 1#
#Begin Portion 2#
class LoadBalancing:
def __init__(self):
"""Initialize the load balancing system with one server"""
self.connections = {}
self.servers = [Server()]
def add_connection(self, connection_id):
"""Randomly selects a server and adds a connection to it."""
server = random.choice(self.servers)
# Add the connection to the dictionary with the selected server
self.connections[connection_id] = server
# Add the connection to the server
server.add_connection(connection_id)
self.ensure_availability()
def close_connection(self, connection_id):
"""Closes the connection on the the server corresponding to connection_id."""
# Find out the right server
server = self.connections[connection_id]
# Close the connection on the server
server.close_connection(connection_id)
# Remove the connection from the load balancer
del self.connections[connection_id]
def avg_load(self):
"""Calculates the average load of all servers"""
# Sum the load of each server and divide by the amount of servers
total = 0
for server in self.servers:
total += server.load()
return total/len(self.servers)
def ensure_availability(self):
"""If the average load is higher than 50, spin up a new server"""
while self.avg_load() > 50:
self.servers.append(Server())
def __str__(self):
"""Returns a string with the load for each server."""
loads = [str(server) for server in self.servers]
return "[{}]".format(",".join(loads))
#End Portion 2#
l = LoadBalancing()
for connection in range(20):
l.add_connection(connection)
print(l)
Proyecto
#!/bin/python3
# Here are all the installs and imports you will need for your word cloud script and uploader widget
!pip install wordcloud
!pip install fileupload
!pip install ipywidgets
!jupyter nbextension install --py --user fileupload
!jupyter nbextension enable --py fileupload
import wordcloud
import numpy as np
from matplotlib import pyplot as plt
from IPython.display import display
import fileupload
import io
import sys
def _upload():
_upload_widget = fileupload.FileUploadWidget()
def _cb(change):
global file_contents
decoded = io.StringIO(change['owner'].data.decode('utf-8'))
filename = change['owner'].filename
print('Uploaded `{}` ({:.2f} kB)'.format(
filename, len(decoded.read()) / 2 **10))
file_contents = decoded.getvalue()
_upload_widget.observe(_cb, names='data')
display(_upload_widget)
_upload()
def calculate_frequencies(file_contents):
# Here is a list of punctuations and uninteresting words you can use to process your text
punctuations = '''!()-[]{};:'"\,<>./?@#$%^&*_~'''
uninteresting_words = ["the", "a", "to", "if", "is", "it", "of", "and", "or", "an", "as", "i", "me", "my", \
"we", "our", "ours", "you", "your", "yours", "he", "she", "him", "his", "her", "hers", "its", "they", "them", \
"their", "what", "which", "who", "whom", "this", "that", "am", "are", "was", "were", "be", "been", "being", \
"have", "has", "had", "do", "does", "did", "but", "at", "by", "with", "from", "here", "when", "where", "how", \
"all", "any", "both", "each", "few", "more", "some", "such", "no", "nor", "too", "very", "can", "will", "just"]
# LEARNER CODE START HERE
frecuencies = {}
for word in file_contents.split(' '):
word = word.lower()
clean_word = ""
for letter in word:
if letter not in punctuations:
clean_word += letter
if clean_word not in uninteresting_words and clean_word.isalpha():
if clean_word not in frecuencies:
frecuencies[clean_word] = 0
frecuencies[clean_word] = frecuencies[clean_word] + 1
#wordcloud
cloud = wordcloud.WordCloud()
cloud.generate_from_frequencies(frecuencies)
return cloud.to_array()
# Display your wordcloud image
myimage = calculate_frequencies(file_contents)
plt.imshow(myimage, interpolation = 'nearest')
plt.axis('off')
plt.show()
Interact with the Operating System
Week1
healthy_checks.py
#!/usr/bin/env python3
import shutil
import psutil
from network import *
def check_disk_usage(disk):
"""Verifies that there's enough free space on disk"""
du = shutil.disk_usage(disk)
free = du.free / du.total * 100
return free > 20
def check_cpu_usage():
"""Verifies that there's enough unused CPU"""
usage = psutil.cpu_percent(1)
return usage < 75
# If there's not enough disk, or not enough CPU, print an error
if not check_disk_usage('/') or not check_cpu_usage():
print("ERROR!")
elif check_localhost() and check_connectivity():
print("Everything ok")
else:
print("Network checks failed")
network
#!/usr/bin/env python3
import requests
import socket
def check_localhost():
localhost = socket.gethostbyname('localhost')
return localhost == "127.0.0.1"
def check_connectivity():
request = requests.get("http://www.google.com")
return request.status_code == 200
Week2
csv_operations.py
#!/usr/bin/env python3
import csv
hosts = [["workstation.local", "192.168.25.46"], ["webserver.cloud", "10.2.5.6"]]
with open("example.csv", "w") as hosts_csv:
writer = csv.writer(hosts_csv)
writer.writerows(hosts)
f = open("example.csv")
csv_f = csv.reader(f)
for row in csv_f:
name, ip_address = row
f.close()
# Leer y escribir usando diccionarios
users = [{"name": "Sol Mansi", "username": "solm", "department": "IT infraestructure"},
{"name": "Lio Nelson", "username": "lion", "department": "User Experience Research"},
{"name": "Charlie grey", "username": "greyc", "department": "development"}]
keys = ["name", "username", "department"]
with open("example2.csv", "w") as users_csv:
writer = csv.DictWriter(users_csv, fieldnames=keys)
writer.writeheader()
writer.writerows(users)
with open("example2.csv") as f2:
reader = csv.DictReader(f2)
for row in reader:
print("{} trabaja en {}".format(row["name"], row["department"]))
directories_operations.py
#!/usr/bin/env python3
import os
os.mkdir("new_dir")
os.chdir("new_dir")
current_directory = os.getcwd()
directory = "~/"
for name in os.listdir(directory):
fullname = os.path.join(directory, name)
if os.path.isdir(fullname):
print("{} es un directorio".format(fullname))
else:
print("{} es un archivo".format(fullname))
#os.path.isdir(dirname)
#os.path.isfile(filename)
files_operations.py
#!/usr/bin/env python3
import os
import datetime
# Remover archivo
os.remove('lol.txt')
# Renombrar archivos
os.rename("old_name.txt", "new_name.txt")
# Verificar existencia
os.path.exists("lol.txt")
# Ver tamaño de un archivo
size = os.path.getsize("lol.txt")
print(size)
# Ver el unix timestamp de un archivo
timestamp = os.path.getmtime("lol.txt")
modificationdate = datetime.datetime.fromtimestamp(timestamp)
print(modificationdate)
# Ver el Absolute path
abspath = os.path.abspath("lol.txt")
print(abspath)
read_write.py
#!/usr/bin/env python3
# Leer un archivo 1
file = open('lol.txt')
lines = file.readlines()
file.close()
print(lines)
print('---------------------------------')
# Leer un archivo 2
with open('lol.txt') as file:
for line in file:
print(line.strip())
# Escribir en un archivo
# Modos:
# r: read only
# w: write only -> remplaza el contenido del archivo
# a: append
# r+: read-write
# b: binary mode
with open('writingFile.txt', 'w') as file:
count = file.write('mmmmmmmmmm')
print(count)
Tarea
generate_report.py
#!/usr/bin/env python3
import csv
def read_employees(csv_file_location):
csv.register_dialect('empDialect', skipinitialspace=True, strict=True)
employee_file = csv.DictReader(open(csv_file_location), dialect = 'empDialect')
employee_list = []
for data in employee_file:
employee_list.append(data)
return employee_list
def process_data(employee_list):
department_list = []
for employee_data in employee_list:
department_list.append(employee_data['Department'])
department_data = {}
for department_name in set(department_list):
department_data[department_name] = department_list.count(department_name)
return department_data
def write_report(dictionary, report_file):
with open(report_file, "w+") as f:
for k in sorted(dictionary):
f.write(str(k)+':'+str(dictionary[k])+'\n')
f.close()
employee_list = read_employees('employees.csv')
dictionary = process_data(employee_list)
write_report(dictionary, 'test_report.txt')
employees.csv
Full Name, Username, Department
Audrey Miller, audrey, Development
Arden Garcia, ardeng, Sales
Bailey Thomas, baileyt, Human Resources
Blake Sousa, sousa, IT infrastructure
Cameron Nguyen, nguyen, Marketing
Charlie Grey, greyc, Development
Chris Black, chrisb, User Experience Research
Courtney Silva, silva, IT infrastructure
Darcy Johnsonn, darcy, IT infrastructure
Elliot Lamb, elliotl, Development
Emery Halls, halls, Sales
Flynn McMillan, flynn, Marketing
Harley Klose, harley, Human Resources
Jean May Coy, jeanm, Vendor operations
Kay Stevens, kstev, Sales
Lio Nelson, lion, User Experience Research
Logan Tillas, tillas, Vendor operations
Micah Lopes, micah, Development
Sol Mansi, solm, IT infrastructure
Week3
#!/usr/bin/env python3
import re
print(re.search(r"p.ng", "Pangea", re.IGNORECASE))
Tarea
#!/usr/bin/env python3
#Import libraries
import csv
import re
"""
First, it changed the domain name to the new domain name. Second, it stored all the updated domain names in a new file.
"""
def contains_domain(address, domain):
"""Returns True if the email address contains the given domain,
in the domain position, false if not."""
domain_pattern = r'[\w\.-]+@'+domain+'$'
if re.match(domain_pattern, address):
return True
return False
def replace_domain(address, old_domain, new_domain):
"""Replaces the old domain with the new domain in
the received address."""
old_domain_pattern = r'' + old_domain + '$'
address = re.sub(old_domain_pattern, new_domain, address)
return address
def main():
"""Processes the list of emails, replacing any instances of the
old domain with the new domain."""
old_domain, new_domain = 'abc.edu', 'xyz.edu'
csv_file_location = '/home/student-03-e34fb3ff2b39/data/user_emails.csv'
report_file = '/home/student-03-e34fb3ff2b39/data' + '/updated_user_emails.csv'
user_email_list = []
old_domain_email_list = []
new_domain_email_list = []
with open(csv_file_location, 'r') as f:
user_data_list = list(csv.reader(f))
user_email_list = [data[1].strip() for data in user_data_list[1:]]
for email_address in user_email_list:
if contains_domain(email_address, old_domain):
old_domain_email_list.append(email_address)
replaced_email = replace_domain(email_address, old_domain, new_domain)
new_domain_email_list.append(replaced_email)
email_key = ' ' + 'Email Address'
email_index = user_data_list[0].index(email_key)
for user in user_data_list[1:]:
for old_domain, new_domain in zip(old_domain_email_list, new_domain_email_list):
if user[email_index] == ' ' + old_domain:
user[email_index] = ' ' + new_domain
f.close()
with open(report_file, 'w+') as output_file:
writer = csv.writer(output_file)
Week4
Tarea de la semana 4
#!/usr/bin/env python3
import sys
import os
import re
def error_search(log_file):
error = input("What is the error? ")
returned_errors = []
with open(log_file, mode='r',encoding='UTF-8') as file:
for log in file.readlines():
error_patterns = ["error"]
for i in range(len(error.split(' '))):
error_patterns.append(r"{}".format(error.split(' ')[i].lower()))
if all(re.search(error_pattern, log.lower()) for error_pattern in error_patterns):
returned_errors.append(log)
file.close()
return returned_errors
def file_output(returned_errors):
with open(os.path.expanduser('~') + '/data/errors_found.log', 'w') as file:
for error in returned_errors:
file.write(error)
file.close()
if __name__ == "__main__":
log_file = sys.argv[1]
returned_errors = error_search(log_file)
file_output(returned_errors)
sys.exit(0)
Week5 Testing
assert False, "Oh no! This assertion failed!"
# python -O script.py
#!/usr/bin/env python3
import unittest
class TestStringMethods(unittest.TestCase):
def test_upper(self):
self.assertEqual('foo'.upper(), 'FOO')
def test_isupper(self):
self.assertTrue('FOO'.isupper())
self.assertFalse('Foo'.isupper())
def test_split(self):
s = 'hello world'
self.assertEqual(s.split(), ['hello', 'world'])
# check that s.split fails when the separator is not a string
with self.assertRaises(TypeError):
s.split(2)
@unittest.skip("showing class skipping")
class MySkippedTestCase(unittest.TestCase):
def test_not_run(self):
pass
class ExpectedFailureTestCase(unittest.TestCase):
@unittest.expectedFailure
def test_fail(self):
self.assertEqual(1, 0, "broken")
if __name__ == '__main__':
unittest.main()
from unittest import IsolatedAsyncioTestCase
events = []
class Test(IsolatedAsyncioTestCase):
def setUp(self):
events.append("setUp")
async def asyncSetUp(self):
self._async_connection = await AsyncConnection()
events.append("asyncSetUp")
async def test_response(self):
events.append("test_response")
response = await self._async_connection.get("https://example.com")
self.assertEqual(response.status_code, 200)
self.addAsyncCleanup(self.on_cleanup)
def tearDown(self):
events.append("tearDown")
async def asyncTearDown(self):
await self._async_connection.close()
events.append("asyncTearDown")
async def on_cleanup(self):
events.append("cleanup")
if __name__ == "__main__":
unittest.main()
import unittest
class WidgetTestCase(unittest.TestCase):
def setUp(self):
self.widget = Widget('The widget')
def test_default_widget_size(self):
self.assertEqual(self.widget.size(), (50,50),
'incorrect default size')
def test_widget_resize(self):
self.widget.resize(100,150)
self.assertEqual(self.widget.size(), (100,150),
'wrong size after resize')
def tearDown(self):
self.widget.dispose()
def suite():
suite = unittest.TestSuite()
suite.addTest(WidgetTestCase('test_default_widget_size'))
suite.addTest(WidgetTestCase('test_widget_resize'))
return suite
if __name__ == '__main__':
runner = unittest.TextTestRunner()
runner.run(suite())
================
import re
my_txt = "An investment in knowledge pays the best interest."
def LetterCompiler(txt):
result = re.findall(r'([a-c]).', txt)
return result
print(LetterCompiler(my_txt))
import unittest
class TestCompiler(unittest.TestCase):
def test_basic(self):
testcase = "The best preparation for tomorrow is doing your best today."
expected = ['b', 'a', 'a', 'b', 'a']
self.assertEqual(LetterCompiler(testcase), expected)
class TestCompiler2(unittest.TestCase):
def test_two(self):
testcase = "A b c d e f g h i j k l m n o q r s t u v w x y z"
expected = ['b', 'c']
self.assertEqual(LetterCompiler(testcase), expected)
# EDGE CASES HERE
unittest.main(argv = ['first-arg-is-ignored'], exit = False)
================
python -m unittest -v tests/test_something.py
python -m unittest -v test_module
def linear_search(list, key):
"""If key is in the list returns its position in the list,
otherwise returns -1."""
for i, item in enumerate(list):
if item == key:
return i
return -1
def binary_search(list, key):
"""Returns the position of key in the list if found, -1 otherwise.
List must be sorted.
"""
left = 0
right = len(list) - 1
while left <= right:
middle = (left + right) // 2
if list[middle] == key:
return middle
if list[middle] > key:
right = middle - 1
if list[middle] < key:
left = middle + 1
return -1
#!/usr/bin/env python3
# validations.py
def validateUser(username, minlength):
assert type(username) == str, "username must be a string"
if minlength < 1:
raise ValueError("minlength must be at least 1")
if len(username) < minlength:
return False
if not username.isalnum():
return False
return True
#!/usr/bin/env python3
import unittest
from validations import validateUser
# test_validations.py
class TestValidateUser(unittest.TestCase):
def test_valid_type(self):
self.assertRaises(AssertionError, validateUser, [], 1)
def test_value(self):
with self.assertRaises(ValueError):
validateUser('',0)
def test_too_short(self):
self.assertEqual(validateUser('',1), False)
def test_valid_users(self):
self.assertTrue(validateUser('0123456789',1))
self.assertTrue(validateUser("Juan",1))
if __name__ == '__main__':
unittest.main()
Tarea semana 5 emails.py
#!/usr/bin/env python3
import sys
import csv
def populate_dictionary(filename):
"""Populate a dictionary with name/email pairs for easy lookup."""
email_dict = {}
with open(filename) as csvfile:
lines = csv.reader(csvfile, delimiter = ',')
for row in lines:
name = str(row[0].lower())
email_dict[name] = row[1]
return email_dict
def find_email(argv):
""" Return an email address based on the username given."""
# Create the username based on the command line input.
try:
fullname = str(argv[1] + " " + argv[2])
# Preprocess the data
email_dict = populate_dictionary('/home/student-03-e34fb3ff2b39/data/user_emails.csv')
# Find and print the email
if email_dict.get(fullname.lower()):
return email_dict.get(fullname.lower())
else:
return "No email address found"
except IndexError:
return "Missing parameters"
def main():
print(find_email(sys.argv))
if __name__ == "__main__":
main()
test_emails.py
#!/usr/bin/env python3
import unittest
from emails import find_email
class EmailsTest(unittest.TestCase):
def test_basic(self):
testcase = [None, "Bree", "Campbell"]
expected = "breee@abc.edu"
self.assertEqual(find_email(testcase), expected)
def test_one_name(self):
testcase = [None, "John"]
expected = "Missing parameters"
self.assertEqual(find_email(testcase), expected)
def test_two_name(self):
testcase = [None, "Roy","Cooper"]
expected = "No email address found"
self.assertEqual(find_email(testcase), expected)
if __name__ == '__main__':
unittest.main()
sudo apt install python3-pip
pip3 install psutil
#|/usr/bin/env python3
import psutil
import subprocess
psutil.cpu_percent()
psutil.disk_io_counters()
psutil.net_io_counters()
src = "<./data/prod>" # replace <source-path> with the source directory
dest = "./data/prod_backup" # replace <destination-path> with the destination directory
subprocess.call(["rsync", "-arq", src, dest])