Notas

Notas, scripts, algoritmos de uso diario y documentación de algunos proyectos.

View on GitHub
14 July 2021

Python

by Juan Manuel González Garzón

Python

Crash Course on Python

Ejemplos

#!/usr/bin/env python3

# ===============================================================================
# Ejemplos basicos
# ===============================================================================
def month_days(month, days):
    """Función donde transformo numero en texto para concatenarlo"""
    print(month + " has " + str(days) + " days.")

month_days("June",30)
month_days("July",31)

def attempts(n):
    """Hacer varios intentos"""
    x = 1
    while x <= n:
        print("Attempt " + str(x))
        x += 1
    print("Done")

attempts(5)

# ===============================================================================
# Ejemplos Declaración y uso de clases
# ===============================================================================
#Begin Portion 1#
import random

class Server:
    def __init__(self):
        """Creates a new server instance, with no active connections."""
        self.connections = {}

    def add_connection(self, connection_id):
        """Adds a new connection to this server."""
        connection_load = random.random()*10+1
        # Add the connection to the dictionary with the calculated load
        self.connections[connection_id] = connection_load

    def close_connection(self, connection_id):
        """Closes a connection on this server."""
        # Remove the connection from the dictionary
        del self.connections[connection_id]

    def load(self):
        """Calculates the current load for all connections."""
        total = 0
        # Add up the load for each of the connections
        for load in self.connections.values():
            total += load
        return total

    def __str__(self):
        """Returns a string with the current load of the server"""
        return "{:.2f}%".format(self.load())

#End Portion 1#

#Begin Portion 2#
class LoadBalancing:
    def __init__(self):
        """Initialize the load balancing system with one server"""
        self.connections = {}
        self.servers = [Server()]

    def add_connection(self, connection_id):
        """Randomly selects a server and adds a connection to it."""
        server = random.choice(self.servers)
        # Add the connection to the dictionary with the selected server
        self.connections[connection_id] = server
        # Add the connection to the server
        server.add_connection(connection_id)
        self.ensure_availability()

    def close_connection(self, connection_id):
        """Closes the connection on the the server corresponding to connection_id."""
        # Find out the right server
        server = self.connections[connection_id]
        # Close the connection on the server
        server.close_connection(connection_id)
        # Remove the connection from the load balancer
        del self.connections[connection_id]

    def avg_load(self):
        """Calculates the average load of all servers"""
        # Sum the load of each server and divide by the amount of servers
        total = 0
        for server in self.servers:
            total += server.load()
        return total/len(self.servers)

    def ensure_availability(self):
        """If the average load is higher than 50, spin up a new server"""
        while self.avg_load() > 50:
            self.servers.append(Server())

    def __str__(self):
        """Returns a string with the load for each server."""
        loads = [str(server) for server in self.servers]
        return "[{}]".format(",".join(loads))
#End Portion 2#

l = LoadBalancing()
for connection in range(20):
    l.add_connection(connection)
print(l)

Proyecto

#!/bin/python3
# Here are all the installs and imports you will need for your word cloud script and uploader widget

!pip install wordcloud
!pip install fileupload
!pip install ipywidgets
!jupyter nbextension install --py --user fileupload
!jupyter nbextension enable --py fileupload

import wordcloud
import numpy as np
from matplotlib import pyplot as plt
from IPython.display import display
import fileupload
import io
import sys

def _upload():

    _upload_widget = fileupload.FileUploadWidget()

    def _cb(change):
        global file_contents
        decoded = io.StringIO(change['owner'].data.decode('utf-8'))
        filename = change['owner'].filename
        print('Uploaded `{}` ({:.2f} kB)'.format(
            filename, len(decoded.read()) / 2 **10))
        file_contents = decoded.getvalue()

    _upload_widget.observe(_cb, names='data')
    display(_upload_widget)

_upload()

def calculate_frequencies(file_contents):
    # Here is a list of punctuations and uninteresting words you can use to process your text
    punctuations = '''!()-[]{};:'"\,<>./?@#$%^&*_~'''
    uninteresting_words = ["the", "a", "to", "if", "is", "it", "of", "and", "or", "an", "as", "i", "me", "my", \
    "we", "our", "ours", "you", "your", "yours", "he", "she", "him", "his", "her", "hers", "its", "they", "them", \
    "their", "what", "which", "who", "whom", "this", "that", "am", "are", "was", "were", "be", "been", "being", \
    "have", "has", "had", "do", "does", "did", "but", "at", "by", "with", "from", "here", "when", "where", "how", \
    "all", "any", "both", "each", "few", "more", "some", "such", "no", "nor", "too", "very", "can", "will", "just"]

    # LEARNER CODE START HERE
    frecuencies = {}
    for word in file_contents.split(' '):
        word = word.lower()
        clean_word = ""
        for letter in word:
            if letter not in punctuations:
                clean_word += letter
        if clean_word not in uninteresting_words and clean_word.isalpha():
                if clean_word not in frecuencies:
                    frecuencies[clean_word] = 0
                frecuencies[clean_word] = frecuencies[clean_word] + 1

    #wordcloud
    cloud = wordcloud.WordCloud()
    cloud.generate_from_frequencies(frecuencies)
    return cloud.to_array()

# Display your wordcloud image

myimage = calculate_frequencies(file_contents)
plt.imshow(myimage, interpolation = 'nearest')
plt.axis('off')
plt.show()

Interact with the Operating System

Week1

healthy_checks.py

#!/usr/bin/env python3
import shutil
import psutil
from network import *
def check_disk_usage(disk):
    """Verifies that there's enough free space on disk"""
    du = shutil.disk_usage(disk)
    free = du.free / du.total * 100
    return free > 20
def check_cpu_usage():
    """Verifies that there's enough unused CPU"""
    usage = psutil.cpu_percent(1)
    return usage < 75
# If there's not enough disk, or not enough CPU, print an error
if not check_disk_usage('/') or not check_cpu_usage():
    print("ERROR!")
elif check_localhost() and check_connectivity():
    print("Everything ok")
else:
    print("Network checks failed")

network

#!/usr/bin/env python3
import requests
import socket
def check_localhost():
    localhost = socket.gethostbyname('localhost')
    return localhost == "127.0.0.1"
def check_connectivity():
    request = requests.get("http://www.google.com")
    return request.status_code == 200

Week2

csv_operations.py

#!/usr/bin/env python3
import csv

hosts = [["workstation.local", "192.168.25.46"], ["webserver.cloud", "10.2.5.6"]]
with open("example.csv", "w") as hosts_csv:
	writer = csv.writer(hosts_csv)
	writer.writerows(hosts)

f = open("example.csv")
csv_f = csv.reader(f)
for row in csv_f:
	name, ip_address = row
f.close()

# Leer y escribir usando diccionarios

users = [{"name": "Sol Mansi", "username": "solm", "department": "IT infraestructure"},
	{"name": "Lio Nelson", "username": "lion", "department": "User Experience Research"},
	{"name": "Charlie grey", "username": "greyc", "department": "development"}]

keys = ["name", "username", "department"]

with open("example2.csv", "w") as users_csv:
	writer = csv.DictWriter(users_csv, fieldnames=keys)
	writer.writeheader()
	writer.writerows(users)

with open("example2.csv") as f2:
	reader = csv.DictReader(f2)
	for row in reader:
		print("{} trabaja en {}".format(row["name"], row["department"]))

directories_operations.py

#!/usr/bin/env python3
import os

os.mkdir("new_dir")
os.chdir("new_dir")
current_directory = os.getcwd()

directory = "~/"
for name in os.listdir(directory):
	fullname = os.path.join(directory, name)
	if os.path.isdir(fullname):
		print("{} es un directorio".format(fullname))
	else:
		print("{} es un archivo".format(fullname))

#os.path.isdir(dirname)
#os.path.isfile(filename)

files_operations.py

#!/usr/bin/env python3
import os
import datetime

# Remover archivo
os.remove('lol.txt')

# Renombrar archivos
os.rename("old_name.txt", "new_name.txt")

# Verificar existencia
os.path.exists("lol.txt")

# Ver tamaño de un archivo
size = os.path.getsize("lol.txt")
print(size)

# Ver el unix timestamp de un archivo
timestamp = os.path.getmtime("lol.txt")
modificationdate = datetime.datetime.fromtimestamp(timestamp)
print(modificationdate)

# Ver el Absolute path
abspath = os.path.abspath("lol.txt")
print(abspath)

read_write.py

#!/usr/bin/env python3

# Leer un archivo 1
file = open('lol.txt')
lines = file.readlines()
file.close()
print(lines)

print('---------------------------------')

# Leer un archivo 2
with open('lol.txt') as file:
	for line in file:
		print(line.strip())

# Escribir en un archivo
# Modos:
# 	r:	read only
#	w:	write only  -> remplaza el contenido del archivo
#	a:	append
#	r+:	read-write
#	b:	binary mode
with open('writingFile.txt', 'w') as file:
	count = file.write('mmmmmmmmmm')
	print(count)

Tarea

generate_report.py

#!/usr/bin/env python3
import csv

def read_employees(csv_file_location):
    csv.register_dialect('empDialect', skipinitialspace=True, strict=True)
    employee_file = csv.DictReader(open(csv_file_location), dialect = 'empDialect')
    employee_list = []
    for data in employee_file:
        employee_list.append(data)
    return employee_list

def process_data(employee_list):
    department_list = []
    for employee_data in employee_list:
        department_list.append(employee_data['Department'])
    department_data = {}
    for department_name in set(department_list):
        department_data[department_name] = department_list.count(department_name)
    return department_data

def write_report(dictionary, report_file):
    with open(report_file, "w+") as f:
        for k in sorted(dictionary):
            f.write(str(k)+':'+str(dictionary[k])+'\n')
        f.close()

employee_list = read_employees('employees.csv')
dictionary = process_data(employee_list)
write_report(dictionary, 'test_report.txt')

employees.csv

Full Name, Username, Department
Audrey Miller, audrey, Development
Arden Garcia, ardeng, Sales
Bailey Thomas, baileyt, Human Resources
Blake Sousa, sousa, IT infrastructure
Cameron Nguyen, nguyen, Marketing
Charlie Grey, greyc, Development
Chris Black, chrisb, User Experience Research
Courtney Silva, silva, IT infrastructure
Darcy Johnsonn, darcy, IT infrastructure
Elliot Lamb, elliotl, Development
Emery Halls, halls, Sales
Flynn McMillan, flynn, Marketing
Harley Klose, harley, Human Resources
Jean May Coy, jeanm, Vendor operations
Kay Stevens, kstev, Sales
Lio Nelson, lion, User Experience Research
Logan Tillas, tillas, Vendor operations
Micah Lopes, micah, Development
Sol Mansi, solm, IT infrastructure

Week3

#!/usr/bin/env python3
import re

print(re.search(r"p.ng", "Pangea", re.IGNORECASE))

Tarea

#!/usr/bin/env python3

#Import libraries
import csv
import re

"""
First, it changed the domain name to the new domain name. Second, it stored all the updated domain names in a new file.
"""

def contains_domain(address, domain):
  """Returns True if the email address contains the given domain,
    in the domain position, false if not."""
  domain_pattern = r'[\w\.-]+@'+domain+'$'
  if re.match(domain_pattern, address):
    return True
  return False

def replace_domain(address, old_domain, new_domain):
  """Replaces the old domain with the new domain in
    the received address."""
  old_domain_pattern = r'' + old_domain + '$'
  address = re.sub(old_domain_pattern, new_domain, address)
  return address

def main():
  """Processes the list of emails, replacing any instances of the
    old domain with the new domain."""
  old_domain, new_domain = 'abc.edu', 'xyz.edu'
  csv_file_location = '/home/student-03-e34fb3ff2b39/data/user_emails.csv'
  report_file =  '/home/student-03-e34fb3ff2b39/data' + '/updated_user_emails.csv'
  user_email_list = []
  old_domain_email_list = []
  new_domain_email_list = []

  with open(csv_file_location, 'r') as f:
    user_data_list = list(csv.reader(f))
    user_email_list = [data[1].strip() for data in user_data_list[1:]]
    for email_address in user_email_list:
      if contains_domain(email_address, old_domain):
        old_domain_email_list.append(email_address)
        replaced_email = replace_domain(email_address, old_domain, new_domain)
        new_domain_email_list.append(replaced_email)

    email_key = ' ' + 'Email Address'
    email_index = user_data_list[0].index(email_key)

    for user in user_data_list[1:]:
      for old_domain, new_domain in zip(old_domain_email_list, new_domain_email_list):
        if user[email_index] == ' ' + old_domain:
          user[email_index] = ' ' + new_domain
    f.close()

  with open(report_file, 'w+') as output_file:
    writer = csv.writer(output_file)

Week4

Tarea de la semana 4

#!/usr/bin/env python3
import sys
import os
import re

def error_search(log_file):
  error = input("What is the error? ")
  returned_errors = []
  with open(log_file, mode='r',encoding='UTF-8') as file:
    for log in  file.readlines():
      error_patterns = ["error"]
      for i in range(len(error.split(' '))):
        error_patterns.append(r"{}".format(error.split(' ')[i].lower()))
      if all(re.search(error_pattern, log.lower()) for error_pattern in error_patterns):
        returned_errors.append(log)
    file.close()
  return returned_errors

def file_output(returned_errors):
  with open(os.path.expanduser('~') + '/data/errors_found.log', 'w') as file:
    for error in returned_errors:
      file.write(error)
    file.close()

if __name__ == "__main__":
  log_file = sys.argv[1]
  returned_errors = error_search(log_file)
  file_output(returned_errors)
  sys.exit(0)

Week5 Testing

assert False, "Oh no! This assertion failed!"
# python -O script.py
#!/usr/bin/env python3
import unittest

class TestStringMethods(unittest.TestCase):

    def test_upper(self):
        self.assertEqual('foo'.upper(), 'FOO')

    def test_isupper(self):
        self.assertTrue('FOO'.isupper())
        self.assertFalse('Foo'.isupper())

    def test_split(self):
        s = 'hello world'
        self.assertEqual(s.split(), ['hello', 'world'])
        # check that s.split fails when the separator is not a string
        with self.assertRaises(TypeError):
            s.split(2)

@unittest.skip("showing class skipping")
class MySkippedTestCase(unittest.TestCase):
    def test_not_run(self):
        pass

class ExpectedFailureTestCase(unittest.TestCase):
    @unittest.expectedFailure
    def test_fail(self):
        self.assertEqual(1, 0, "broken")

if __name__ == '__main__':
    unittest.main()
from unittest import IsolatedAsyncioTestCase

events = []

class Test(IsolatedAsyncioTestCase):

    def setUp(self):
        events.append("setUp")

    async def asyncSetUp(self):
        self._async_connection = await AsyncConnection()
        events.append("asyncSetUp")

    async def test_response(self):
        events.append("test_response")
        response = await self._async_connection.get("https://example.com")
        self.assertEqual(response.status_code, 200)
        self.addAsyncCleanup(self.on_cleanup)

    def tearDown(self):
        events.append("tearDown")

    async def asyncTearDown(self):
        await self._async_connection.close()
        events.append("asyncTearDown")

    async def on_cleanup(self):
        events.append("cleanup")

if __name__ == "__main__":
    unittest.main()
import unittest

class WidgetTestCase(unittest.TestCase):
  def setUp(self):
      self.widget = Widget('The widget')

  def test_default_widget_size(self):
      self.assertEqual(self.widget.size(), (50,50),
                       'incorrect default size')

  def test_widget_resize(self):
      self.widget.resize(100,150)
      self.assertEqual(self.widget.size(), (100,150),
                       'wrong size after resize')

    def tearDown(self):
        self.widget.dispose()
def suite():
    suite = unittest.TestSuite()
    suite.addTest(WidgetTestCase('test_default_widget_size'))
    suite.addTest(WidgetTestCase('test_widget_resize'))
    return suite

if __name__ == '__main__':
    runner = unittest.TextTestRunner()
    runner.run(suite())

================

import re

my_txt = "An investment in knowledge pays the best interest."

def LetterCompiler(txt):
    result = re.findall(r'([a-c]).', txt)
    return result

print(LetterCompiler(my_txt))
import unittest

class TestCompiler(unittest.TestCase):

    def test_basic(self):
        testcase = "The best preparation for tomorrow is doing your best today."
        expected = ['b', 'a', 'a', 'b', 'a']
        self.assertEqual(LetterCompiler(testcase), expected)

class TestCompiler2(unittest.TestCase):

    def test_two(self):
        testcase = "A b c d e f g h i j k l m n o q r s t u v w x y z"
        expected = ['b', 'c']
        self.assertEqual(LetterCompiler(testcase), expected)

# EDGE CASES HERE

unittest.main(argv = ['first-arg-is-ignored'], exit = False)

================

python -m unittest -v tests/test_something.py
python -m unittest -v test_module
def linear_search(list, key):
    """If key is in the list returns its position in the list,
       otherwise returns -1."""
    for i, item in enumerate(list):
        if item == key:
            return i
    return -1

def binary_search(list, key):
    """Returns the position of key in the list if found, -1 otherwise.

    List must be sorted.
    """
    left = 0
    right = len(list) - 1
    while left <= right:
        middle = (left + right) // 2

        if list[middle] == key:
            return middle
        if list[middle] > key:
            right = middle - 1
        if list[middle] < key:
            left = middle + 1
    return -1
#!/usr/bin/env python3

# validations.py
def validateUser(username, minlength):
  assert type(username) == str, "username must be a string"
  if minlength < 1:
    raise ValueError("minlength must be at least 1")
  if len(username) < minlength:
    return False
  if not username.isalnum():
    return False
  return True
#!/usr/bin/env python3

import unittest
from validations import validateUser

# test_validations.py
class TestValidateUser(unittest.TestCase):
  def test_valid_type(self):
    self.assertRaises(AssertionError, validateUser, [], 1)

  def test_value(self):
    with self.assertRaises(ValueError):
      validateUser('',0)

  def test_too_short(self):
    self.assertEqual(validateUser('',1), False)

  def test_valid_users(self):
    self.assertTrue(validateUser('0123456789',1))
    self.assertTrue(validateUser("Juan",1))

if __name__ == '__main__':
  unittest.main()

Tarea semana 5 emails.py

#!/usr/bin/env python3

import sys
import csv

def populate_dictionary(filename):
  """Populate a dictionary with name/email pairs for easy lookup."""
  email_dict = {}
  with open(filename) as csvfile:
    lines = csv.reader(csvfile, delimiter = ',')
    for row in lines:
      name = str(row[0].lower())
      email_dict[name] = row[1]
  return email_dict

def find_email(argv):
  """ Return an email address based on the username given."""
  # Create the username based on the command line input.
  try:
    fullname = str(argv[1] + " " + argv[2])
    # Preprocess the data
    email_dict = populate_dictionary('/home/student-03-e34fb3ff2b39/data/user_emails.csv')
    # Find and print the email
    if email_dict.get(fullname.lower()):
      return email_dict.get(fullname.lower())
    else:
      return "No email address found"
  except IndexError:
    return "Missing parameters"

def main():
  print(find_email(sys.argv))

if __name__ == "__main__":
  main()

test_emails.py

#!/usr/bin/env python3
import unittest
from emails import find_email

class EmailsTest(unittest.TestCase):
  def test_basic(self):
    testcase = [None, "Bree", "Campbell"]
    expected = "breee@abc.edu"
    self.assertEqual(find_email(testcase), expected)
  def test_one_name(self):
    testcase = [None, "John"]
    expected = "Missing parameters"
    self.assertEqual(find_email(testcase), expected)
  def test_two_name(self):
    testcase = [None, "Roy","Cooper"]
    expected = "No email address found"
    self.assertEqual(find_email(testcase), expected)
if __name__ == '__main__':
  unittest.main()
sudo apt install python3-pip
pip3 install psutil
#|/usr/bin/env python3
import psutil
import subprocess

psutil.cpu_percent()
psutil.disk_io_counters()
psutil.net_io_counters()

src = "<./data/prod>" # replace <source-path> with the source directory
dest = "./data/prod_backup" # replace <destination-path> with the destination directory
subprocess.call(["rsync", "-arq", src, dest])

Creditos y referencias

Pitón iconos creados por Freepik - Flaticon

tags: