Follow us on:

Airflow postgres insert

airflow postgres insert ” With Postgres, every physical change made to the disk needs to be included in the WAL stream. cli() File "/Users/dlackty/. /init. Let's dive Introduction to PostgreSQL ALTER DATABASE. You can also run the command you’d like with the postgres account directly with sudo. If you are building a web application you need a database. RDS is available on several Update or insert new data, also known as a merge, in existing tables by using a staging table with a combination of updates and inserts. To use the upsert feature in PostgreSQL, you use the INSERT ON CONFLICT statement as follows: Importing a PostgreSQL database from an Amazon EC2 instance If you have data in a PostgreSQL server on an Amazon EC2 instance and want to move it to a PostgreSQL DB instance, you can use the following process. A. One fundamental rule that is usually forgotten is that the NoSQL databases are designed around queries. . operators. 12/lib/python2. Every update results in an insert and delete, and every delete results in rows being soft-marked for deletion. SHOW wal_level Debugging Hibernate application tends to be tricky as there is a time lag between application issuing a calls on hibernate api and hibernate issuing SQL to the database. Variables and macros can be used in templates (see the Jinja Templating section). Data should start flowing into Postgres tables, validate the tables using SQLAlchemy python library. It looks like airflow. . py # Set the airflow home export AIRFLOW_HOME=~/airflow # Install from pypi using pip pip install airflow # Install necessary sub-packages pip install airflow[crypto] # For connection credentials protection pip install airflow[postgres] # For PostgreSQL DBs pip install airflow[celery] # For distributed mode: celery executor pip install airflow[rabbitmq] # For message queuing and passing between airflow initdb to initiate the database where Airflow saves the workflows and their states: airflow webserver to start the web server at localhost:8080 where we can reach the web interface: airflow scheduler to start the scheduling process of the DAGs such that the individual workflows can be triggered: When you installed Postgres it also installed commands for creating new Postgres databases and users. string "airflow" no: db_instance_type: Instance type for PostgreSQL database: string "db. The Engine is the starting point for any SQLAlchemy application. 1. Another example of how seeing the code there makes me so much more comfortable using the hook and trusting that you know what the code is doing and how to $ virtualenv . In case, for some reason, it is necessary to include a value in the query string (as opposite as in a value) the merging rule is still different Have keys. Column label for index column(s). Docker images for Fargate are stored in Amazon Elastic Container Registry (Amazon ECR). 2. . Configure the password: \password. Before going forward, let’s get familiar with the terms: Transform your business with innovative solutions; Whether your business is early in its journey or well on its way to digital transformation, Google Cloud's solutions and technologies help solve your toughest challenges. pid if this happens. table_1', 2 :: SMALLINT); SELECT s. Apache Airflow version: 2. text Then I save the record with either: myDbase. datetime (2020, 2, 2), schedule_interval = "@once", default_args = default_args, catchup = False,) as dag: create_pet_table Apache Airflow Installation based on Postgresql database. providers. Uses index_label as the column name in the table. First of all, make sure python2. $ python Twit_Postgres. The difference is the lack of any custom Python scripts and Apache Airflow. . Write DataFrame index as a column. For example: export AIRFLOW_CONN_POSTGRES_DEFAULT='postgresql://postgres_user:XXXXXXXXXXXX@1. postgres_hook import PostgresHook pg_hook = PostgresHook(postgres_conn_id='postgres_bigishdata') You can take a look at the source code for the Hook and you’ll see that the PostgresHook inherits from DbApiHook, which has functions such as run, insert_rows, and get_cursor, that all come from SqlAlchemy functionality. . csv') # New DataFrame is the sum of the columns from the first DataFrame df_sum = df. hook. . authEncryptSecretKey: A random (cryptographically safe) generated string that is used for encryption and HMAC signing lakefsConfig: lakeFS config YAML stringified, as shown above. Or, download the Windows installer for the latest version of the PostgreSQL ODBC driver (psqlODBC) directly from this FTP site - https://www. . You can find package information and changelog for the provider in the documentation. Apache Airflow. Let's walk through a step by step example to export data from Postgres to SQL Server. Macros reference, Macros reference¶. parametrize( "db_operator", [param(x, id=x. I frequently have customers asking about Apache Airflow’s integration with their own applications. The app can be downloaded on postgresapp. sum, axis=0) # Load new DataFrame into PostgreSQL database con = pg8000. insert_rows(table=self. As you see, pg_dump writes its results to the standard output. TL;DR $ helm install my-release bitnami/airflow Introduction. Updating and inserting new data - Amazon Redshift AWS Documentation Amazon Redshift Database Developer Guide PostgreSQL database name. join (placeholders)) if replace: if target_fields is None: raise ValueError ("PostgreSQL ON CONFLICT upsert syntax requires column names") if replace_index is None: raise ValueError ("PostgreSQL ON CONFLICT upsert syntax requires an unique index") if isinstance (replace_index, str): Uses AWSHook to retrieve a temporary password to connect to Postgres or Redshift. Or if any of you has some idea with respect to python ecosystem it would be great In this post, I’m going to record down my journey of airflow setup. Once the app is downloaded and moved into the applications folder, the app can be opened. In the airflow_db connection object: Change the Conn Prepare¶. Airflow is an orchestra conductor to control all different data processing tools under one roof . You’ll design the data models to optimize queries for understanding what songs users are listening to. But in the case of Insert Overwrite queries, Spark has to delete the old data from the object store. If you have many ETL(s) to manage, Airflow is a must-have. database: name of the database to The idea is that when you insert a new row into the table, PostgreSQL will update the row if it already exists, otherwise, it will insert the new row. . operators. utils. It often leads people to go through an entire deployment cycle to manually push the trigger button on a live system. There is a special step prepare, which allows you to populate your data sources with batch operations. Exit from the psql shell and again from the Postgres user by pressing Ctrl+D twice, and you'll be logged in as the pi user again. PostgreSQL compares the WAL record's LSN (LSN_1) with the page LSN (LSN_0). py and Twit_Postgres. . What is it. # This works, but it is not optimal table_name = 'my_table' cur. If you wish to backup a remote system, you can pass the "-h" flag for specifying the remote host, and the "-p" flag to give the remote port: Welcome to the AWS Lambda tutorial with Python P6. And create a postgres type connection with the name redshift, using your redshift credentials. We then can run the container by specifying the postgres. Extract the files from the . In the next release of Airflow after 1. systemd unit file: The leading provider of test coverage analytics. Create a new database and user for Airflow, and name them both airflow. To create the database, follow the steps given below: Log in to PostgreSQL: sudo -u postgres psql. Airflow components: Database – By default, Airflow uses an SQLite database which is not scalable. Column(fldname) = mytextfield. You’ll do this first with a relational model in Postgres, then with a NoSQL data model with Apache Cassandra. postgres_operator import PostgresOperator from airflow. now(), After the ExecStart=, insert the path to your virtual environment like this: ExecStart=/home/dosontung007/airflow_venv/bin/airflow webserver --pid /run/airflow/webserver. Enabling service monitoring using something like runit, monit etc… Metadata Database •As the number of jobs you run on Airflow increases, so does the load on the Airflow database. # su - xxx We need to enable airflow to connect to our redshift database. I have two tables: listing(PK listing_sk, ) and. sql . operators. postgres. . To do this we go to Airflow UI -> Admin -> Connections and click on the Create tab. pem file you downloaded earlier and insert the following commands, replacing your-public-ip and your-region with the relevant values from your EC2 instance. static _generate_insert_sql (table: str, values: Tuple [str, ], target_fields: Iterable , replace: bool, ** kwargs) [source] ¶ Static helper method that generate the INSERT SQL statement. Ensure that all your new code is fully covered, and see coverage trends emerge. postgres. g. hooks. Start your airflow workers, on each worker, run: $ airflow worker. Field(fldname). macros. PostgreSQL loads the table's page from the database cluster into the shared buffer pool. 0. Inserting records into a database. You can read more about the naming conventions used in Naming conventions for provider packages airflow-scheduler; airflow-webserver; airflow-worker; airflow-init - To initialize db and create user; flower; redis; postgres - This is backend for airflow. This template provides a easy way to deploy a puckel/docker-airflow image (latest tag) on a Linux Web App with Azure database for PostgreSQL. All these technologies provide very fast end to end development of REST service with the interaction Installing PostgreSQL (OS X) While working on previous projects, I ran into so many issues while attempting to install (and re-install, and uninstall, and re-install) PostgreSQL. csv” is located. The main idea is : start from local airflow server running, python packages installation, DB configuration (connect to Airflow), integrate with Travis CI, make the processes all integrated and automative : testing(dev), docker push (dev), and airflow Heroku deployment (if and only if all checks passed). Airflow can retry a task if it fails. Airflow allows us to write jobs as Directed Acyclic Graphs (DAGs) of tasks, with each task getting something useful done, like a database INSERT. from airflow. This is not recommended. py. I saw errors that looked like this in my logs for an app that uses a hobby-tier Heroku Postgres instance: PG::ReadOnlySqlTransaction: ERROR: cannot execute UPDATE or INSERT in a read-only transaction Resolution /var/lib/postgresql/data blind mount folder data for timescale in docker are not updated despite of explicit insert and update rows to it 21st October 2020 docker , docker-compose , docker-volume , postgresql , timescaledb PostgreSQL Operators is a database management system and open-source software that enables easy access for the public to use for relational database purposes. postgres_hook. venv/" > . See reference for available configurations. plugin aws postgresql apache-airflow aws-secrets-manager airflow-plugins Updated Apr 11, 2019 Note: With the release of Postgres 9. operators. DbApiHook Interact with Postgres. py:1150}} ERROR - (psycopg2. Our workaround is to delete airflow-monitor. One solution to this is to analyze a sample of the table. run_validation_operator ("action_list_operator", assets_to_validate = [batch_file], # This run_id can be whatever Description. io File "dags/recipe_hourly_pageviews. Editor’s note: Today’s guest post is by Jeff McCormick, a developer at Crunchy Data, showing how to deploy a PostgreSQL cluster using Helm, a Kubernetes package manager. Connect to PostgreSQL PostgreSQL is an open source relational database management system developed by a worldwide team of volunteers. $ airflow initdb PostgreSQL used the OID internally as a primary key for its system tables. . In this session we will be speaking on the motivatio Exposing REST service by connecting to the database and fetching/inserting records is very interesting and important to any project or personal skill. values ()) PostgresHook POSTGRES_DB=airflow. Installing dependencies. 1. Each Resource Manager template is licensed to you under a license agreement by its owner, not Microsoft. . Apache Airflow gives us possibility to create dynamic DAG. dbapi_hook. The following is an Apache Airflow DAG that uses the copy_readings function defined in the script above. /init. sh ENTRYPOINT [". hooks. . In this post, I won’t be going through Airflow, what it… A template for deploying Airflow Web App with PostgreSQL database on App Services This Azure Resource Manager template was created by a member of the community and not by Microsoft. Airflow Docker Web App on Linux with PostgreSQL. You use Postgres If this is the case, you can copy this solution literally. To configure Airflow to use Postgres rather than the default Sqlite3, go to airflow. Also, you need to have superuser access in order to execute the COPY statement successfully. sql. PostgreSQL database version is 10. Let’s create a topic in which we want to consume the updates from PostgreSQL. cfg. While this chart comes with an embedded stable/postgresql, this is NOT SUITABLE for production. earthquake_events (event_id, event_name, magnitude, longitude, latitude, date) VALUES (%s, %s, %s, %s, %s, %s); """ # pull event data from previous task instance using XCom: events = context ["ti"]. It integrates directly with postgres via the PostgreSQL driver but supports many other databases. sql COPY scripts/airflow/set_init. 1 Project. import uuid from datetime import datetime from airflow import DAG from airflow. postgres_operator import PostgresOperator @mark. ') CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES 65b706348544 postgres " docker-entrypoint. 0 once installation is completed, type airflow version to verify. The default database that airflow ships with is SQLite, which is practically only useful for initial testing. Run an INSERT query to insert rows marked for insert or update. FROM apache/airflow:2. I am new to airflow, been playing around with it for past 1 week. Create dir inserting_data = PostgresOperator ( task_id='inserting_data', postgres_conn_id='postgres_default', sql='INSERT INTO my_table VALUES ("val")' ) Here, we insert the value “val” in the table “my_table”. OS (e. Open the PostgreSQL>pgAdmin III tool. This is a postgres feature that allows us to write UPSERT (update or insert) queries based on a unique identifier(id in our case). While Postgres is amazing at running multiple operations at the same time, there are a few cases in which Postgres needs to block an operation using a lock. I use the Postgres driver together with the go-bindata source driver . Speaker: Daniel Poon, Wegocron has been the go-to solution for scheduling tasks on remote Linux systems. sudo -u postgres bash -c "createdb airflow" sudo -u postgres bash -c "createuser airflow --pwprompt" The createuser command will prompt you for a password for the airflow Source code for airflow. Typically, the INSERT statement returns OID with value 0. close() print('Finally, connection closed. CREATE OR REPLACE PROCEDURE will either create a new procedure, or replace an existing definition. •In this case you need to have a production ready metadata db like MySQL or Postgres. . . However the airflow-monitor. 1. To successfully query from your Airflow Deployment's Database, you'll need to set up your local Postgres connection. Before we install postgres, we should quick perform a quick update of the apt-get repository: apt-get update. You When specifying the connection as URI (in AIRFLOW_CONN_ {CONN_ID} variable) you should specify it following the standard syntax of DB connections, where extras are passed as parameters of the URI (note that all components of the URI should be URL-encoded). Amazon RDS supports the latest PostgreSQL version 11 , which includes several enhancements to performance, robustness, transaction management, query parallelism, and more. We have provided several options to the docker run command: Today I’ll talk about Apache Airflow usage, a REST API. Testing Airflow is hard There's a good reason for writing this blog post - testing Airflow code can be difficult. get_first (insert_activities_sql,parameters=insert_activities_params) This does return the Id but the record is not committed into the activities table. Airflow macros. Log into PostgreSQL and check the WAL level. Amazon RDS for PostgreSQL makes it easy to set up, operate, and scale PostgreSQL deployments in the cloud. A fork of Postgresql for “OLAP” workloads - Made for huge volumes of data (compression) - Columnar DB Column-oriented Row-oriented Aggregate on a single column Fast Slow SELECT on many columns table Fast Slow INSERT/UPDATE on single rows Slow Fast SELECT single row Slow Fast Rows Columns Redshift has a column disk layout vs Postgres Migrating from DynamoDB to Postgres The mass popularity of NoSQL databases has also resulted in it being used in all use cases without understanding the suitability for the use case. . “How can I execute a job from my application?” or “how can I get my job status in my dashboard?” are good examples of the questions I receive the most. insert_query = """ INSERT INTO public. 0-python3. output_data it will be updated, else a new record will be inserted into the sample. All of these steps are described in a script named insert_log. INSERT INTO users (id, name, city) SELECT id, name, city FROM users_staging3 s WHERE row_type = ‘i’ OR row_type = ‘u’; As you can see, a set of updates are done using only 3 SQL queries ( COPY, DELETE and INSERT) instead of the previous 5. . When I run a create table statement with postgres user, confere select it without any problem:--as postgres create table airflow_staging. Frequent list of commands to be used: \l -> To list all the databases \du ->TO list all the users/roles \dt -> To show all tables in the working database Airflow installs an SQLLite feature by default. Ingest data from PostgreSQL tables to Kafka topics. Accessing a Postgres Prompt Without Switching Accounts. hooks. On the Airflow UI, navigate over to Admin > Connections. Now that we have the new password and it has been changed in the connections page, we will clear the failed execution. . In Airflow 2. postgres Installation: $ brew install postgresql Connect to the database: psql -h hostname -u username -p password-d databasename. operators. pg_dump is a regular PostgreSQL client application (albeit a particularly clever one). pid. itertuples(index=False, name=None)) # insert list of tuples in destination Postgres table self. Bases: airflow. app! airflow-heroku-dev. Replace the POSTGRES-AIRFLOW-PASSWORD placeholder with the password assigned to the airflow_user user account (defined by you when creating the airflow database in Step 1). Specify a name for the Virtual Network rule, select your Editor’s note: Today’s guest post is by Jeff McCormick, a developer at Crunchy Data, showing how to deploy a PostgreSQL cluster using Helm, a Kubernetes package manager. Airflow Installation/ Postgres Setup. t2. To capture DML changes that persist in the database, set the Write-Ahead-Log (WAL) level to logical. Port is required. pid is a python process as airflow webserver. quote_ident (table_name), [10, 20]) This is now safe, but it somewhat ad-hoc. create a directory for Airflow: mkdir ~/airflow set it as the home directory: export AIRFLOW_HOME='~/airflow' refer to Airflow official website, install the current latest version, using: pip install apache-airflow==1. DbApiHook Interact with Postgres. We need to declare two postgres connections in airflow. Small logical changes (such as updating a timestamp) necessitate many on-disk changes: Postgres must insert the new tuple and update all indexes to point to that tuple. connect('postgres', password='secret_word') df_sum. . Its ideal to have separate databases for airflow and your data. You Airflow UI Connections Editing Batch PostgreSQL Connection. The data is a set of Reddit voting data, which is the the same data used in Processing arbitrary amount of data in Python. someone saw this isssue when running long tasks [2020-09-14 16:45:26,189] {{taskinstance. 7 PostgreSQL – Airflow Database Back-end . . models import DAG from datetime import timedelta postgres configuration. You will then be prompted to enter the password. pipeline/bin/activate $ pip install \ airflow [postgres] \ celery \ cryptography \ MySQL-python \ redis I'll then initialise Airflow's database and workspace. ¶ airflow. . pid is gunicorn and airflow-monitor. 0. tasks if isinstance(x, PostgresOperator)], ) def test_database_operators_have_sql(db_operator): """For all PostgresOperator task(s), verify that sql attribute returns non-empty value. Read and write in a specific partition. Choices include # SequentialExecutor, LocalExecutor, CeleryExecutor executor = LocalExecutor The LocalExecutor can parallelize task instances locally. airflow. The PostgreSQL object-relational database system provides reliability and data integrity. It has pretty strong monitoring, controlling and troubleshooting instruments to touch any level of Running your Apache Airflow development environment in Docker Compose. sh . Your airflow workers should be now picking up and running jobs from the airflow server. References def setUp(self): postgres = PostgresHook() with postgres. We can insert data row by row, or add multiple rows at a time. py in the same location and execute (Twit_Postgres. insert_data('s. 10 and updated in Airflow 1. In our case, if a row corresponding to a given id exists in sample. AirflowException: Argument ['owner', 'task_id'] is required The issue seems to be that some default_args are missing, but this happens very early on in the execution, basically when the BaseOperator __init__ method is invoked, thus no DAG specific default_args have been read in yet. Investigating the problem Some initial digging indicated that on executing certain queries, the async_exec method in the Ruby Postgres driver was returning nil , rather than PG::Result as ActiveRecord was expecting. dbapi_hook. Crunchy Data supplies a set of open source PostgreSQL and PostgreSQL related containers. Setting up Airflow and an Airflow database is fairly simple but can involve a few steps. Or, download the latest ODBC driver. pid. pem&sslrootcert=%2Ftmp%2Fserver-ca. You can also use md5, xdes and des. . Then, we create the table to use after during the task in Apache Airflow and the method to insert data in it. Configure airflow. com> Latest at 2021-01-25 04:21:39 by "tsunakawa. create a directory for Airflow: mkdir ~/airflow set it as the home directory: export AIRFLOW_HOME='~/airflow'. This is a painfully long process … See full list on dataquest. 0. Then specify conntype=Postgres, Schema=orders, login=oltp_read (same password) and port 5432 or whatever you’re using. 5 import datetime from airflow import DAG from airflow. In SQL, we use the INSERT command to add records/rows into table data. 7 and pip are installed and upgraded to the latest. /extra/check_init. Hello, in this post I will show you how to set up official Apache/Airflow with PostgreSQL and LocalExecutor using docker and docker-compose. The default authentication mode is set to 'ident' which means a given Linux user xxx can only connect as the postgres user xxx. replicaCount: Number of lakeFS pods: 1: resources What we want is a situation where the container image of, say, PostgreSQL version 9 can be replaced with an image of version 10 without us having to lose any data. But if your Airflow deployment looks different (e. So, this interesting exercise can be done by using PostgreSQL, Python, SQLAlchemy, and Flask. 1. Apache Airflow is a platform to programmatically author, schedule and monitor workflows. . text rs docker exec -it airbyte-source psql -U postgres -c "INSERT INTO public. insert_data('s. execute( "INSERT INTO As the DAG continues to run, you can insert additional data on the PostgreSQL side, have Airflow move the data to YugabyteDB, and track the runs in the Airflow UI by going to Browse > Dag Runs. What is Airflow? Airflow is a workflow engine which is responsible for managing and scheduling running jobs and data pipelines. Postgres is exposed on the host (dev machine, not Docker containers) port 35432. . •The scheduler is still the weakest link. 1:5432/postgresdb?sslmode=verify-ca&sslcert=%2Ftmp%2Fclient-cert. We also noticed that instances of this exception appeared to be correlated with INSERT queries that violated unique constraints. This means that you can do this backup procedure from any remote host that has access to the database. Python and Postgres, a match made in heaven. How to extract and interpret data from MongoDB, prepare and load MongoDB data into PostgreSQL, and keep it up-to-date. DataContext (< insert path to your great_expectations. Many of the questions asked in #postgresql revolve around using sequences in PostgreSQL. Edit rs. StringValue = mytextfield. Only after can they verify their Airflow code. # Standard imports import pandas as pd import numpy as np # Library to interface the PostgreSQL database engine import pg8000 # Extract from csv file into a DataFrame df = pd. csvkit is slow for larger files (I'm talking gigabytes of data in a file), as it's never meant to do this. Crunchy Data supplies a set of open source PostgreSQL and PostgreSQL related containers. remember this as we will use it to connect to the database in Python. g. For a more in depth comparisson you can check out this post by Citus Data. Run Stack Builder to add the PostgreSQL ODBC driver (psqlODBC) to your installation of PostgreSQL. 5, PostgreSQL allows “upserts” (update or insert) of rows into a table via the ON CONFLICT clause of the INSERT statement. Luckily, there is now an excellent tool that simplifies using PostgreSQL… Postgres. 0. ├── dags # the dag root folder │ ├── first_dag. In Airflow, each DAG has a schedule, which uses the cron format, so it can be daily, hourly, or just run every Wednesday at 3:15PM. g. takay at fujitsu. import airflow from builtins import range from airflow. cfg file found in Airflow home. What you can do is use it just to get the schema, create the table, and then use Postgresql COPY command. 2. vondra at 2ndquadrant. index bool, default True. Airflow needs a database to create tables necessary for running Airflow. postgresql. hive. takay at fujitsu. . views: stored all columns of a view, for detail view queries and long-running metrics aggregation ; slim_views: stored a subset of columns, for sub-hour metrics queries PostgreSQL uses multiversion concurrency control (MVCC) to allow greater database concurrency. cursor() as cur: for table in TABLES: cur. This specification describes the postgresql trigger that scales based on a postgresql query. . In case you need to import a CSV file from your computer into a table on the PostgreSQL database server, you can use the pgAdmin. For PostgreSQL, you will also define Fact and Dimension tables and insert data into your new tables. Can be a single line string or object. . According to Amazon, Amazon Relational Database Service (RDS) provides six familiar database engines to choose from, including Amazon Aurora, PostgreSQL, MySQL, MariaDB, Oracle Database, and SQL Server. The configuration to change the database can be easily done by just replacing the SQL Alchemy connection string value within the airflow. Do not use pool. It ensures that the jobs are ordered correctly based on dependencies and also manages the allocation of resources and failures. Find the airflow_db Connection Object. random → x in the interval [0, 1). postgres=# insert into app_users (username, password) postgres-# values ( 'myuser', crypt ('mypassword', gen_salt ('bf')) ); INSERT 0 1. You're now connected to the "test" database. Like Gandalf and Frodo, the two just go together. dummy_operator import DummyOperator from airflow. PostgresHook (*args, **kwargs) [source] ¶. datetime(2019,1,1)) [postgres,s3] pip install -U apache-airflow mkdir -p $HOME/docker/volumes/postgres. 5+. pg_hook = PostgresHook (postgres_conn_id='postgres_default') insert_activities_sql = "INSERT INTO activities (---) VALUES (---) RETURNING id " activity_results = pg_hook. Airflow Docker Web App on Linux with PostgreSQL: This template provides a easy way to deploy a puckel/docker-airflow image (latest tag) on a Linux Web App with Azure database for PostgreSQL: Web App on Linux with PostgreSQL: This template provides a easy way to deploy Web App on Linux with Azure database for PostgreSQL(Preview). If None is given (default) and index is True, then the index names are used. (for Airflow connection) string url or kv object. Dblink is a Postgres extension that allows you to execute queries in an external database from PostgreSQL. ALL_DONE, parameters=(uuid. This is the Docker way of upgrading software, you don’t drop inside the container and update packages using a package manager. Those “known-to-be-working” constraints are per major/minor python version. output_data table. yaml). A candidate row will only be inserted if that row does not violate any unique constraints. 1/32 trust host replication all ::1/128 trust host all mydb ::1/128 trust (7 rows) The returning at the end is a nice add-on that allows us to get the ID of the newly added row. Both are fun and easy to use, addicting, both have so many surprises and hidden gems. 9. Soft-marking identifies dead tuples that will be purged later. format(table)) cur. py) script and let it run in the background. task_id) for x in dag. HStore is a key value store within Postgres. postgres python package. sql # ENTRYPOINT SCRIPT COPY scripts/airflow/init. Demo how to deploy an Airflow app to Heroku cloud. python_operator import PythonOperator from airflow. The Crunchy PostgreSQL Container Suite includes containers that deploy, monitor, and administer the open source PostgreSQL database Using PostgreSQL App. However, the underlying generated SQL is specific to MySQL's "REPLACE INTO" syntax and is not applicable to Postgres. This will start the server. gitignore file. append: Insert new values to the existing table. . The Crunchy PostgreSQL Container Suite includes containers that deploy, monitor, and administer the open source PostgreSQL database UPSERT is a magic trick where an INSERT converts itself into an update if a row already exists with the provided primary key. This command will not modify the actual structure of the table we’re inserting to, it just adds data. env as the environment variable. com> Latest attachment (v2-0001-Prevent-FDW-insert-batching-during-cross-partitio. . 9, insert into dag_run Module Contents¶ class airflow. gitignore Initialize the git repository and create the Heroku app with a postgres add-on: $ psql -U [postgres_user_name] -h [postgres_host_name] -d [postgres_database_name] IMPORTANT: update your sql_alchemy_conn string in airflow. To run a server through the postgres app, the first step is to download the program. micro" no: db_password: PostgreSQL password. After pip installation success, you can start Airflow (including database, web server and scheduler) simply: For example, here is part of my actual account Adding a user with an encrypted password is as easy as: 1. Chances are we don’t be using a local SQLLite database when we use Airflow in production, so I’ve opted to use a Postgres database: $ pip3 install apache-airflow [postgres] In this blog post, you will learn about Airflow, and how to use Airflow Snowflake combination for efficient ETL. logical_replication to 1. Some of the ways you can avoid producing a different result - Do not use INSERT during a task re-run, an INSERT statement might lead to duplicate rows in your database. All classes for this provider package are in airflow. macros. The database is currently empty and contains no airflow # airflow root directory. environment block of docker-compose. A home for SQL queries that we frequently run on our Airflow postgres database. It is extremely useful when testing big data pipelines or when you need some amount of data pre-populated. These examples are extracted from open source projects. txt Create a . com> postgres=# insert into hba (lines) values ('host all mydb ::1/128 trust'); INSERT 0 1 postgres=# select * from hba where lines !~ '^#' and lines !~ '^$'; lines ----- local all all trust host all all 127. 0. createdb test csvsql--db postgresql: /// test--tables fy09--insert examples / realdata / FY09_EDU_Recipients_by_State. 7 USER root # INSTALL TOOLS RUN apt-get update \ && apt-get -y install libaio-dev \ && apt-get install postgresql-client RUN mkdir extra USER airflow # COPY SQL SCRIPT COPY scripts/airflow/check_init. 1. execute("CREATE TABLE {}(some_str varchar, some_num integer);" . Like the previous task, the SQL script needs to know where “processed_log. postgres_hook. Additionally, mathematical functions can be applied to the numeric price column in the products table. Provider package. Sometimes though, an incrementing ID exposed in an API or URLs reveals too much about the service, like the number of users you have (see the German Tank Problem). PostgreSQL runs on all major operating systems and has been ACID-compliant since 2001. . As part of the setup, it uses an Amazon RDS for PostgreSQL database as a metadata store, and Amazon ElastiCache for Redis as a Celery backend. The connection string is postgres://user:[email protected]:35432/db (username, password and database name are defined in the services. ') except (Exception, psycopg2. DbApiHook Interact with Postgres. CAPSTONE PROJECT Module Contents¶ class airflow. Has anyone implemented any such thing with airflow. Amazon RDS for PostgreSQL. test (a text) --as confere select * from airflow_staging. query if you need transactional integrity: the pool will dispatch every query passed to pool. Airflow - A platform to programmaticaly author, schedule and monitor data pipelines, by Airbnb. The following come for free out of the box with Airflow. xcom_pull (task_ids = 'get_new_events', key = 'events') for event in events: params = tuple (event. To properly trigger your DAG to run, make sure to insert a fixed time in the past (e. Module Contents¶ class airflow. org/ftp/odbc/versions/msi/. Bases: airflow. shipping_rate(PK shipping_rate_sk, FK listing_sk, ) There can be one or more shipping_rates POC: postgres_fdw insert batching × First at 2020-06-28 15:10:02 by Tomas Vondra <tomas. This is a provider package for postgres provider. 7. Jupyter notebook is found here. 7 and pip are installed and upgraded to the latest. In order "to speak" with a PostgreSQL database pythonistas usually use psycopg2 library. connect(). pid file is transiently becoming read-only, which sometimes prevents airflow from starting. docker run --rm --name pg-docker -e POSTGRES_PASSWORD=docker -d -p 5432:5432 -v $HOME/docker/volumes/postgres:/var/lib/postgresql/data postgres. . I need to implement a rule engine kind of thing along with Airflow. Also How to SELECT * FROM with single quote. The INSERT statement also has an optional RETURNING clause that returns the information of the inserted row. format (table, target_fields_fragment, ",". . Cloud provider or hardware configuration: Dedicated Server. You can vote up the ones you like or vote down the ones you don't like, and go to the original project or source file by following the links above each example. I was curious to see if the data could be inserted into a SQL database (PostgreSQL) for further exploration. The count is the number of rows that the INSERT statement inserted successfully. This ETL (extract, transform, load) process is broken down step-by-step, and instructions are provided for using third-party tools to make the process easier to set up and manage. csv For large tables it may not be practical to process the entire table. execute( "INSERT INTO postgres_to_gcs_operator VALUES(%s, %s);", ('mock_row_content_2', 43) ) cur. Navigate to the directory that contains the airflow_key_pair. pip install "apache-airflow[postgres, password]" pip install "cryptography" pip freeze > requirements. . cfg and update this configuration to LocalExecutor: # The executor class that airflow should use. In this article I am will simple use case in which I will create a DAG to load data from cloud storage to BigQuery table. Create a new parameter group and set the value of rds. 1/32 trust host all all ::1/128 trust local replication all trust host replication all 127. 12 we also keep a set of “known-to-be-working” constraint files in the constraints-master and constraints-1-10 orphan branches. . The public DNS Name of the EC2 instance can be found on the Outputs tab Type yes when prompted after the SSH command. bash_operator import BashOperator from airflow. . Kubernetes version (if you are using kubernetes) (use kubectl version): n. Containerized Airflow is deployed on AWS Fargate. s…" 3 minutes ago Up 3 minutes 5432/tcp postgres_1 Database Setup Airflow plugin to interact with PostgreSQL database using AWS Secrets Manager to retrieve credentials. Always free for open source. hooks. 0, all operators, transfers, hooks, sensors, secrets for the postgres provider are in the airflow. A sequence should be given if The following are 30 code examples for showing how to use pyodbc. Starting the Postgres container is as simple as running the docker run command. com" <tsunakawa. 4 Querying Data from PostgreSQL . We will see below how this can be useful. Since you created a Postgres user called pi, you can access the Postgres shell from here with no credentials: $ psql test. In order to have repeatable installation, however, starting from Airflow 1. Automation of pipelines in the data analytics field is an important task and a point of discussion in every architecture design as to which automation tool will suit the purpose. pipeline $ source. CREATE PROCEDURE defines a new procedure. . pip install psycopg2 sqlalchemy In case you’re on windows, and things get notorious while installing dependencies, you can manually download a . Install airflow, install cryptography module, and set Procfile to init db on initial run. PostgreSQL - A powerful, open source object-relational database system. csv”, gets the data and loads them into a PostgreSQL database. It is written in C programming language using libpq. If there was ever a match made in heaven, it’s using Python and Postgres together. Our topic name will be postgres_users. I am also creating additional database userdata as a backend for my data flow. You therefore have to be careful about which locks your transactions take, but with the high-level abstractions that PostgreSQL provides, it can be difficult to know exactly what will happen. table_1', 4 :: SMALLINT); !21 Splitting data in chunks / partitions @martin_loetzsch user chunk = user_id % 5 22. 1 2 3 4 5 6 7 8. trigger_rule import TriggerRule from airflow. If you are instead using PostgreSQL you can use pg_dumpall: pg_dumpall -h [ host] \ -U [ user] \ --file = postgresql_backup. InsertRecord(sTable, row) or rs. providers. We just need to configure the connection from Postgres to Redshift using a “ CREATE SERVER ” clause (defining the host, port and dbname) and then map the Postgres user with the Redshift user (using a “ CREATE USER MAPPING ” clause Insert operations on Hive tables can be of two types — Insert Into (II) or Insert Overwrite (IO). . For the sake of keeping this article short and focused on Airflow’s scheduling capabilities, please check out this link to setup Postgres and Airflow. 10. I keep getting this error when I try to insert data from my html form to Postgres database: web_1 | (Background on this error at: http://sqlalche. This project is a very basic example of fetching real time data from an open source API. . Link: Airflow_Data_Pipelines. Hello. sudo apt-get install postgresql postgresql-contrib In this post I will show you how to create a fully operational environment in 5 minutes, which will include: Apache Airflow WebServer Apache Airflow Worker Apache Airflow Scheduler Flower – is a web based tool for monitoring and administrating Celery clusters Redis – is an open source (BSD licensed), in-memory data structure store, used […] Apache Airflow is an open-source platform to programmatically author, schedule and monitor workflows. Grant privileges (like the ability to insert) to tables in the new schema to the new role postgres=# GRANT ALL ON ALL TABLES IN SCHEMA test TO xxx; Disconnect postgres=# \q Became a standard user. pem&sslkey=%2Ftmp%2Fclient-key. This Azure Resource Manager template was created by a member of the community and not by Microsoft. To be able to define a procedure, the user must have the USAGE privilege on the language. We will look into various strategies to look into how to debug hibernate. sql gzip postgres_backup. This is what the data looks like (username, link, and score). Multi-row insert INSERT INTO employee_records(emp_id,department,designation,category) values(1,’admin’,’assistant’,’contract’), values(2,’admin’,’senior assistant’’,’permanent’), values(3,’security’,’manager’’,’permanent’); -- Procedure to insert a new city CREATE OR REPLACE FUNCTION add_city (city VARCHAR (70), state CHAR (2)) RETURNS void AS $$ BEGIN INSERT INTO cities VALUES (city, state); END; $$ LANGUAGE plpgsql; You can use SELECT statement to invoke the add_city procedure: -- Add a new city SELECT add_city ('St. hooks. destination_table, rows=rows) Note: The snippet is for reference only; it has NOT been tested. We will use this file for staging in a later section. PostgresHook (*args, **kwargs) [source] ¶. Each Resource Manager template is licensed to you under a license agreement by its owner, not Microsoft. providers. . There are some different types of Executors in airflow, like SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor, from airflow. 4 and JSONB, in most cases it becomes a better approach than simply HStore. Install Airflow. The Postgresql scaler allows for two connection options: A user can offer a full connection string (often in the form of an environment variable secret) PostgreSQL provides the utility program pg_dump for this purpose. Project 6: Api Data to Postgres. Starting with Apache Airflow to automate a PostgreSQL database on Amazon RDS. . sql If you are backing up a single database, you can exploit the Postgres "Custom" dump format, which is an already compressed and optimized backup format: Our original architecture consisted of multiple, sharded, Postgres databases, as well as Airflow workers that performed aggregation. This is not the same file as airflow. sql . /extra/set_init. Transactions within PostgreSQL are scoped to a single client and so dispatching individual queries within a single transaction across multiple, random clients will cause big problems in your app and not work. from /etc/os-release): Ubuntu 18. ') else: print('Connection not established to PostgreSQL. . I am using PostgresHook in an Airflow operator. You should make use of an external mysql or postgres database, for example, one that is managed by your cloud provider. . This feature is very useful when we would like to achieve flexibility in Airflow, to do not create many DAGs for each case but have only on DAG where we will have power to change the tasks and relationships between them dynamically. Louis', 'MO'); Apache Airflow is a platform to programmatically author, schedule, and monitor workflows. . to_sql(name='my_table', con=con) To insert data from one scheme to another, the syntax is as follows. . Relational Database purposes are actually the manageability of data to explain it in a nutshell. import psycopg2 try: conn = psycopg2. We need to install the sqlalchemy library along with the database adapter for PostgreSQL, which is psycopg2. . In Postgresql, generally an incrementing primary key is used, which SERIAL is an excellent helper type for. Go to the connections screen in the UI (through Admin) and create a new postgres connection and call this postgres_oltp. Postgres, and by extension Redshift (which based on an old version of Postgres) hasn’t had UPSERT functionality until recently. insert_data('s. RETURNING clause. table_1', 1 :: SMALLINT); SELECT s. 2. closest_ds_partition (table, ds, before = True, schema = 'default', metastore_conn_id = 'metastore_default') [source] ¶ This Go to the PostgreSQL on Azure resource, and under Connection Security settings menu VNET rules section, select Add existing virtual network. postgres import PostgresOperator default_args = {"owner": "airflow"} # create_pet_table, populate_pet_table, get_all_pets, and get_birth_date are examples of tasks created by # instantiating the Postgres Operator with DAG (dag_id = "postgres_operator_dag", start_date = datetime. Python community likes PostgreSQL as well as PHP community likes MySQL. I'm using Python to connect to a Postgres database (in Airflow, so uses psycopg2 adapter). In this case we used the Blowfish algorithm to generate the salt. The basic usage of this command is: pg_dump dbname > outfile. DatabaseError) as error: print(error) finally: if conn is not None: conn. index_label str or sequence, default None. Tagged with apacheairflow, python, docker, dockercompose. To avoid answering the same questions again and again, I thought it would be worthwhile to summarize the basic steps involving in using sequences in PostgreSQL. It contains information about the status of tasks, DAGs, Variables, connections, etc. I started digging into the Kaggle movies dataset recently, which is a collection of CSV files. They were made for each other. The three sharded Postgres databases were. If none is provided, default is used for each service. hooks. utils. Trigger Specification . py # where you put your first task Once this pipeline is saved --- and as long as you have Airflow running in the background --- your DAG will automatically get picked up by Airflow. int % 123456789, datetime. It’s “home base” for the actual database and its DBAPI, delivered to the SQLAlchemy application through a connection pool and a Dialect, which describes how to talk to a specific kind of database/DBAPI combination. PostgreSQL alter database statement is used to alter the database features like changing the ownership, change the name of the database, set the new tablespace of the database, and set the configuration parameter for the database. PostgresHook (*args, **kwargs) [source] ¶. In the case of Insert Into queries, only new data is inserted and old data is not deleted/touched. s…" 2 minutes ago Up 2 minutes 5432/tcp postgres_2 16b1dcfd09fb postgres " docker-entrypoint. In this example, a table named students will be created in Postgres and then it will be imported to the Adventureworks sample SQL Server database. In this tutorial, I have shown, how to get file name and content of the file from the S3 bucket, when AWS Engine Configuration¶. postgres_hook. I have a window with TextFields that I cycle through, assigning the text from each field to the Postgres column: row. . We also want to give an identifier to this container as it will be referenced by the The INSERT statement above just specifies the attendance_pkey constraint to be the basis for whether or not DO NOTHING should take effect. Using Postgres, I have a table with a varchar column ALLOW NULL with a default value of ‘’ (empty string). . If that constraint is violated, instead of an error, Postgres just skips the insert and displays the usual command tag: INSERT 0 0 class PostgresOperator(BaseOperator): """ Executes sql code in a specific Postgres database :param postgres_conn_id: reference to a specific postgres database :type postgres_conn_id: string :param sql: the sql code to be executed :type sql: Can receive a str representing a sql statement, a list of str (sql statements), or reference to a template file. . insert_data('s. 2. . apply(np. Once apt-get has updated go ahead and download Postgres and its helpful accompanying dependencies: sudo apt-get install postgresql postgresql-contrib. execute("DROP TABLE IF EXISTS {} CASCADE;". query on the first available idle client. Replace it with UPSERT. py", line 73, in <module> dag. For scalability purposes, we can use MySQL, Postgres database aswell. PostgresHook's parent class, DbApiHook, implements upsert in its insert_rows()method with the replace=Trueflag. before we can access the database in python, we need to create the database in postgresql. PostgreSQL Insert Strategies – Performance Test; How to Insert Data [into PostgresSQL] - as Fast as Possible; Scenario. sh"] lalligood July 31, 2019 July 31, 2019 2 Comments on Custom Airflow Operators for Loading Data Into PostgreSQL While the ETL I am responsible for takes advantage of PostgreSQL’s foreign data wrappers to simplify (avoid?) extracting from one database into another, I was recently tasked with an interesting project to track (changes in) the # convert pandas DataFrame into list of tuples rows: List[Tuple[Any, ]] = list(df. This will delete the execution record from Airflow’s database, so the next time the scheduler checks it will see that there is a pending execution and it will run Install Airflow First of all, make sure python2. The data will be staged into Hive and we’ll run Hive queries to populate the Data Vault model. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. Let’s dive into how we can actually use SQL to insert data into a database. string: n/a: yes: db_subnet_group_name: db subnet group, if assigned, db will create in that subnet, default create in default vpc: string "" no: db_username: PostgreSQL username As mentioned in the documentation, you may try Airflow with SQLite as its backend database for experimenting and learning if you don’t want to initiate a real database (mainly MySQL or PostgreSQL). zip file and run the . connect(host="localhost",database="mydb", user="postgres", password="postgres") if conn is not None: print('Connection established to PostgreSQL. get_batch (batch_kwargs_file, < insert name of your expectation suite >) # Run the validation results = context. pyenv/versions/2. Using this feature, one can ask Postgres to return essentially any value you want; returning the ID of the newly inserted row is just the tip of the iceberg. uuid4(). By default it’s a SQLite file (database), but for concurrent workloads one should use backend databases such as PostgreSQL. insert_row = PostgresOperator(task_id='insert_row', sql='INSERT INTO new_table VALUES(%s, %s, %s)', trigger_rule=TriggerRule. patch) at 2021-01-24 12:31:42 from Amit Langote <amitlangote09 at gmail. Configure PostgreSQL in AWS RDS. I've been trying to figure this out and I'm still scratching my head a bit and would appreciate your help. Calling ETL Helper scripts from Apache Airflow. postgres_operator task_id='insert_row', sql static _generate_insert_sql (table, values, target_fields, replace, ** kwargs) [source] ¶ Static helper method that generate the INSERT SQL statement. airflow. postgres_hook # -*- coding: utf-8 -*- # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. For instance, in the last example, you were instructed to get to the Postgres prompt by first switching to the postgres user and then running psql to open the Postgres prompt. pem'. You Airflow is a platform to schedule and monitor workflows and in this post I will show you how to use it to extract the daily weather in New York from the OpenWeatherMap API, convert the temperature to Celsius and load the data in a simple PostgreSQL database. . Thus, the tasks should produce the same outcome on every re-run. format(table)) cur. for file in C:\Path\To\Files begin csvsql --db postgresql:///test --table fy09 --insert $file end. Run the Postgres Container. 10. Thus, many changes will be put into the WAL stream. It is good practice to explicitly create topics so that we can control the number of partitions and replication factor as we may not want to stick with the default values. com. To carry out these tasks, PostgreSQL runs a vacuum job. With that, postgres is installed on your server. PostgreSQL, is simply is one of the most rock-solid, robust, highly-scalable, and Open Source RDBMS. a different database) the concept is still valid and you can easily adapt the solution for your needs. Therefore, it must be accessible by the PostgreSQL server machine. hooks. me/e/14/e3q8) web_1 It is recommended to use INSERT INTO with a column list as much as possible. Modify the database instance to associate to this customized parameter group. Contents 1 Principles 3 2 Beyond the Horizon 5 3 Content 7 3. msi file. Notice that you can execute multiple requests at once by passing a list of SQL requests. whl. OperationalError) could not translate host name "airflow-dev-postgresql" to address: Temporary failure in name resolution Install Postgres. We’re going to start a postgres instance that contains the airflow database and another database for a (postgres port) of the adventureworks database often used by Microsoft. Bases: airflow. There are two different cases for I/O queries: Perfect! we now have related tables containing various data types, which can be joined using their keys. In this course you are going to learn everything you need to start using Apache Airflow through theory and pratical videos. get_conn() as conn: with conn. . Open the Postgres app: In order to start the server, click the start button. 3. sudo su - postgres pg_dump postgres > postgres_db. Required. The airflow_db connection is generated by default. execute ("insert into %s values (%% s, %% s)" % ext. postgres package. execute( "INSERT INTO postgres_to_gcs_operator VALUES(%s, %s);", ('mock_row_content_1', 42) ) cur. Edit the Connection. operators. postgresql: existingSecret: airflow-postgresql redis: existingSecret: airflow-redis Docs (Database) - External Database. The REPLACE variant is specific to MySQL syntax. whl file for psycopg2 from ~Gohlke and then install it by running pip install /path/to/<downloaded file>. a Environment: Docker. - Postgresql trick -How to insert single qoute when trying to INSERT INTO. 7/site-packages/airflow/models. Here, we have two databases “yourDatabaseName1” and “yourDatabaseName2” −insert i List of AWS subnet ids for Airflow server and database: vpc_id: AWS VPC in which to create the Airflow server: security_group_id: AWS Security group in which to create the Airflow server: db_password: Password for the PostgreSQL instance: fernet_key: Key for encrypting data in the database - see Airflow docs SQL syntax such as "INSERT OR UPDATE", "UPSERT" or "INSERT ON CONFLICT" may be more efficient, but the the exact commands depend on the target database type. test a| -| But when I create a table with confere user, postgres cannot see its content because it gets a owner error: Apache airflow is developed to manage complex workflow but when you have just started with airflow is important to understand and implement the tiny use case as a task and when these tasks combine in a flow or pipeline then you will see the benefit of airflow. PostgreSQL Operators. Next we query data from our on-premise PostgreSQL database, we can do this by creating a function that will use the COPY command to store a piece of our table or the full table into a local file. That is why we call the action is upsert (the combination of update or insert). users(col1) VALUES('record3');" You now have a Postgres database ready to be replicated! Connect the Postgres database PostgreSQL reads the WAL record of the first INSERT statement from the appropriate WAL segment file. read_csv('my_csv. Finally, once the task 3 is finished, task 4 creates a table corresponding to the data contained in “processed_log. The PostgreSQL connector allows you to access data from PostgreSQL based databases within Data Studio. All python code is Python 3. Import CSV file into a table using pgAdmin. Demonstration - Export data from Postgres to SQL Server. 1. echo ". This chart bootstraps an Apache Airflow deployment on a Kubernetes cluster using the Helm package manager. Below are the different PostgreSQL Operators, which are as follows: PostgreSQL connection string to be used by lakeFS secrets. That’s because Stitch enables the data engineer (or even a less technical Analytics Engineer ) to define and schedule data extracts and loads via a web interface hosted by a vendor. operators. These define how your airflow instance will connect to your redshift cluster. table_1', 3 :: SMALLINT); SELECT s. bak This command is actually a PostgreSQL client program, so it can be run from a remote system as long as that system has access to the database. Replace the REDIS-HOST placeholder with the DNS name of the Azure Cache for Redis service (defined by you in Step 1 at deployment time). yml >) # Create your batch_kwargs batch_kwargs_file = {"path": < insert path to your data file >, "datasource": "my_pandas_datasource"} # Create your batch (batch_kwargs + expectation suite) batch_file = context. Run in parallel (depends on ETL framework) SELECT s. . This allows me to embed the migrations into the binaries I build, ensuring the database migrations are versioned in SCM together with the logic that is used to interact with the FAQ: Using Sequences in PostgreSQL. dbapi_hook. On each of these runs, Airflow creates an instance of the DAG PostgreSQL is one of the most popular open source database. hooks. . 0. Works with most CI services. Why did I briefly see Postgres read-only transaction errors for my Hobby instance? Issue. Link: API to Postgres. I'd like to override this method in PostgresHook to implement the "INSERT sql = "INSERT INTO {0} {1} VALUES ({2})". Starting with version 9. In this project, we build an etl pipeline to fetch data from yelp API and insert it into the Postgres Database. airflow postgres insert