top of page

Airflow 2.0 Migration Issue

Updated: Jan 14

When performing migrations from Apache Airflow 1.10.15 to 2.0.1 we ran into an issue with the scheduler crashing and the UI displaying the following error

 [2021-02-06 10:10:48,756] {scheduler_job.py:3423} ERROR - Exception when executing SchedulerJob._run_scheduler_loop Traceback (most recent call last):   
File "/global/packages/python/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1275, in _execute     
self._run_scheduler_loop()   
File "/global/packages/python/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1377, in _run_scheduler_loop     
num_queued_tis = self._do_scheduling(session)   
File "/global/packages/python/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1474, in _do_scheduling     
self._create_dag_runs(query.all(), session)   
File "/global/packages/python/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1557, in _create_dag_runs     
dag = self.dagbag.get_dag(dag_model.dag_id, session=session)   
File "/global/packages/python/lib/python3.7/site-packages/airflow/utils/session.py", line 62, in wrapper     return func(*args, **kwargs)   
File "/global/packages/python/lib/python3.7/site-packages/airflow/models/dagbag.py", line 171, in get_dag     self._add_dag_from_db(dag_id=dag_id, session=session)   
File "/global/packages/python/lib/python3.7/site-packages/airflow/models/dagbag.py", line 227, in _add_dag_from_db
raise SerializedDagNotFound(f"DAG '{dag_id}' not found in serialized_dag table") airflow.exceptions.SerializedDagNotFound: DAG 'dag_name' not found in serialized_dag table 

We determined the problem was due to an incorrect step in the airflow db upgrade script that created an incorrect data type in the dag_code table. We wanted to maintain the history from the DAG runs in the 1.10.15 version, so dropping and recreating the database was not an option. To correct the problem, we deleted the dag_code table and recreated the table with the following SQL.


  /* Change to the database name you use for airflow */ 
USE [airflow]
 GO

 SET ANSI_NULLS ON
 GO
 SET QUOTED_IDENTIFIER ON
 GO
 CREATE TABLE [dbo].[dag_code](
     [fileloc_hash] [bigint] NOT NULL,
     [fileloc] varchar NOT NULL,
     [source_code] nvarchar NOT NULL,
     [last_updated] [datetime] NOT NULL,
 PRIMARY KEY CLUSTERED 
 (
     [fileloc_hash] ASC
 )WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
 ) ON [PRIMARY] TEXTIMAGE_ON [PRIMARY]
 GO

After restarting Airflow, the errors went away and the history was retained from the older version of Airflow.



14 views0 comments

Comments


bottom of page