Python Code Examples

Selenium

import os
import pandas as pd
import time
from selenium import webdriver
from fake_useragent import UserAgent

abspath = os.path.abspath(
    r"C:\Program Files\Google\Chrome\Application\chromedriver.exe"
)
headers = {"User-Agent": UserAgent().random}

browser = webdriver.Chrome(executable_path=abspath)
url = "https://weather.cma.cn/web/weather/59493.html"
browser.get(url)
time.sleep(10)
searchelement = browser.find_element_by_id("hourTable_0").get_attribute("outerHTML")
hourly_weather = pd.read_html(searchelement)

time.sleep(10)
browser.quit()

Plot RDF

import rdflib
from rdflib.extras.external_graph_libs import rdflib_to_networkx_multidigraph
import networkx as nx
import matplotlib.pyplot as plt

url = '/home/r2d2/dtlab/dtlab.ttl'

g = rdflib.Graph()
result = g.parse(url, format='turtle')

G = rdflib_to_networkx_multidigraph(result)

# Plot Networkx instance of RDF Graph
pos = nx.spring_layout(G, scale=2)
edge_labels = nx.get_edge_attributes(G, 'r')
nx.draw_networkx_edge_labels(G, pos, edge_labels=edge_labels)
nx.draw(G, with_labels=True)

#if not in interactive mode for
plt.show()

Convert Graph Format

url = '/home/r2d2/dtlab/dtlab.ttl'

from rdflib import Graph

g = Graph()
g.parse(url)
v = g.serialize(destination="dtlab.owl")

Random number within range

from scipy.stats import truncnorm
def get_truncated_normal(mean=24, sd=1, low=17, upp=30):
  return truncnorm((low-mean)/sd, (upp-mean)/sd, loc=mean, scale=sd)

data = get_truncated_normal(mean=24, sd=2, low=17, upp=30)

plt.plot(data.rvs(10000))
fig,ax = plt.subplots(nrows=1,figsize=(9,6))

ax.hist(data.rvs(10000), normed=True)
plt.show()

autoMoveMouse

# -*- coding: utf-8 -*-

import pyautogui
import time


screenWidth, screenHeight = pyautogui.size()

while True:
    last_mouse_position_Point = pyautogui.position()
    # print(last_mouse_position_Point)

    last_mouse_position = (
        str(last_mouse_position_Point)
        .replace("Point", "")
        .replace("(", "")
        .replace(")", "")
        .replace("x=", "")
        .replace(",", "")
        .replace("y=", "")
        .replace(" ", "")
    )
    # print(last_mouse_position)

    time.sleep(3)

    current_mouse_position_Point = pyautogui.position()
    # print(current_mouse_position_Point)

    current_mouse_position = (
        str(current_mouse_position_Point)
        .replace("Point", "")
        .replace("(", "")
        .replace(")", "")
        .replace("x=", "")
        .replace(",", "")
        .replace("y=", "")
        .replace(" ", "")
    )
    # print(current_mouse_position)

    if last_mouse_position == current_mouse_position:
        print("鼠标在6秒内未动,程序执行中")
        pyautogui.moveTo(screenWidth / 2, screenHeight / 2)
        pyautogui.moveTo(100, 100, duration=0.5, tween=pyautogui.linear)
        pyautogui.moveTo(00, 100, duration=0.5, tween=pyautogui.linear)
        time.sleep(1)
        pyautogui.moveTo(screenWidth / 2, screenHeight / 2)
        time.sleep(1)
    else:
        print("鼠标在移动,程序无法执行,请等待鼠标处于静止状态!")
        time.sleep(6)
    print("上一行代码无continue,代码执行中")

SQL query and MQTT send data

import paho.mqtt.client as mqtt

# set mqtt
#HOST = ""
HOST = ""
PORT = 1883

def on_connect(client, userdata, flags, rc):
    print("Connected with result code "+str(rc))
    client.subscribe("data/receive")


def on_message(client, userdata, msg):
    print("topic:"+msg.topic+" message:"+str(msg.payload.decode('utf-8')))


def on_subscribe(client, userdata, mid, granted_qos):
    print("On Subscribed: qos = %d" % granted_qos)

def on_publish(client, userdata, mid):
    print("Send Success")

def on_disconnect(client, userdata, rc):
    print("Disconnected")
    if rc != 0:
        print("Unexpected disconnection %s" % rc)

# set postgres
import psycopg2
import pandas as pd

# Connect to your postgres DB
conn = psycopg2.connect(database="homeassistant", user="postgres", password="homeassistant", host="0.0.0.0", port="8086")

# Open a cursor to perform database operations
cur = conn.cursor()

# Execute a query
sql = """select statistics.id, statistics_meta.statistic_id, statistics_meta.unit_of_measurement, statistics.created, statistics.start, statistics.mean, statistics.max, statistics.min
from statistics join statistics_meta on statistics.metadata_id = statistics_meta.id
where start >= NOW() - '2 hour'::INTERVAL
"""
cur.execute(sql)

# Retrieve query results
records = cur.fetchall()
df = pd.DataFrame(records, columns=['id','meta_id','unit','created','start','mean','max','min'])
data = df.to_json(orient='records')
conn.commit()
cur.close()
conn.close()

# set influxdb
from influxdb import InfluxDBClient
import pandas as pd
import calendar
import time
import datetime
import numpy as np

start = 1644768000
end = 1645524000
interval = 7200

timestamps = list(range(start,end,interval))

for i in range(len(timestamps)-1):
    starts = datetime.datetime.fromtimestamp(timestamps[i]).strftime("%Y-%m-%d %H:%M:%S")
    ends = datetime.datetime.fromtimestamp(timestamps[i+1]).strftime("%Y-%m-%d %H:%M:%S")

    client = InfluxDBClient(host='0.0.0.0', port=8086, username='homeassistant', password='homeassistant', database='homeassistant', ssl=False, verify_ssl=False)
    sql1 = '''SELECT time, domain, entity_id, value FROM "homeassistant"."autogen"."°C"''' + """ where time >= '%s' and time <= '%s'"""%(starts,ends)

    result_c = client.query(sql1)
    sql2 = '''SELECT time, domain, entity_id, value FROM "homeassistant"."autogen"."%"''' + """ where time >= '%s' and time <= '%s'"""%(starts,ends)
    result_h = client.query(sql2)
    records_c = result_c.raw['series'][0]['values']
    records_h = result_h.raw['series'][0]['values']
    df_c = pd.DataFrame(records_c, columns=['created','domain','entity_id','mean'])
    df_c['unit'] = "°C"
    df_h = pd.DataFrame(records_h, columns=['created','domain','entity_id','mean'])
    df_h['unit'] = "%"
    df = pd.concat([df_c, df_h]).reset_index(drop=True)
    df['created'] = df['created'].apply(lambda x: 1000*calendar.timegm(time.strptime(x, '%Y-%m-%dT%H:%M:%S.%fZ')))
    df['meta_id'] = df['domain'] + '.' + df['entity_id']
    df = df.drop(['domain', 'entity_id'], axis=1)
    for i in range(len(df)):
        time.sleep(0.001)
        df.loc[i,'id'] = int(round(1000*time.time()))
    df['id'] = df['id'].astype(np.int64)
    df['start'] = df['created']
    df['min'] = df['mean']
    df['max'] = df['mean']
    data = df.to_json(orient='records')

    client = mqtt.Client()
    client.username_pw_set("", "")
    client.on_connect = on_connect
    client.on_message = on_message
    client.on_subscribe = on_subscribe
    client.on_publish = on_publish
    client.on_disconnect = on_disconnect
    client.connect(HOST, PORT, 60)
    client.loop_start()
    client.publish("", payload=data, qos=2)
    client.loop_stop()
    client.disconnect()
    time.sleep(30)
Previous
Next